@@ -490,7 +498,7 @@ that id over a Linda once that thread is done (as the last thing you do).
Cancelling
- bool= lane_h:cancel( [timeout_secs=0.0,] [force_kill_bool=false] )
+ bool[,reason]= lane_h:cancel( [timeout_secs=0.0,] [force_kill_bool=false] )
@@ -507,6 +515,9 @@ If the lane is still running and force_kill is true, the
OS thread running the lane is forcefully killed. This means no GC, and should
generally be the last resort.
+
+Starting with v3.3.0, if cancel() returns false, it also returns either "timeout" or "killed" as second return value.
+
Cancellation is tested before going to sleep in receive() or send() calls
and after executing cancelstep Lua statements. Starting with version 3.0-beta, a pending receive()
or send() call is awakened. This means the execution of the lane will resume although the operation has
diff --git a/src/lanes.c b/src/lanes.c
index 9f455b2..462999f 100644
--- a/src/lanes.c
+++ b/src/lanes.c
@@ -47,16 +47,17 @@
*
* To-do:
*
- * Make waiting threads cancelable.
+ * Make waiting threads cancellable.
* ...
*/
-char const* VERSION = "3.2.0";
+char const* VERSION = "3.3.0";
/*
===============================================================================
Copyright (C) 2007-10 Asko Kauppi
+ 2011-12 Benoit Germain
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -136,12 +137,12 @@ struct s_lane {
// S: reads to see if cancel is requested
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- SIGNAL_T done_signal_;
+ SIGNAL_T done_signal;
//
// M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN)
// S: sets the signal once cancellation is noticed (avoids a kill)
- MUTEX_T done_lock_;
+ MUTEX_T done_lock;
//
// Lock required by 'done_signal' condition variable, protecting
// lane status changes to DONE/ERROR_ST/CANCELLED.
@@ -256,7 +257,7 @@ static void check_key_types( lua_State *L, int _start, int _end)
{
continue;
}
- luaL_error( L, "argument #%d: invalid key type (not a boolean, string, number or light userdata)", i);
+ (void) luaL_error( L, "argument #%d: invalid key type (not a boolean, string, number or light userdata)", i);
}
}
@@ -295,7 +296,7 @@ LUAG_FUNC( linda_send)
// make sure there is something to send
if( (uint_t)lua_gettop( L) == key_i)
{
- luaL_error( L, "no data to send");
+ return luaL_error( L, "no data to send");
}
// convert nils to some special non-nil sentinel in sent values
@@ -353,7 +354,8 @@ LUAG_FUNC( linda_send)
STACK_END(L,0)
if( s)
{
- prev_status = s->status;
+ prev_status = s->status; // RUNNING, most likely
+ ASSERT_L( prev_status == RUNNING); // but check, just in case
s->status = WAITING;
ASSERT_L( s->waiting_on == NULL);
s->waiting_on = &linda->read_happened;
@@ -382,7 +384,7 @@ LUAG_FUNC( linda_send)
// must trigger error after keeper state has been released
if( pushed < 0)
{
- luaL_error( L, "tried to copy unsupported types");
+ return luaL_error( L, "tried to copy unsupported types");
}
if( cancel)
@@ -446,7 +448,7 @@ LUAG_FUNC( linda_receive)
expected_pushed_max = (int)luaL_optinteger( L, key_i + 2, expected_pushed_min);
if( expected_pushed_min > expected_pushed_max)
{
- luaL_error( L, "batched min/max error");
+ return luaL_error( L, "batched min/max error");
}
}
else
@@ -507,7 +509,8 @@ LUAG_FUNC( linda_receive)
STACK_END(L, 0)
if( s)
{
- prev_status = s->status;
+ prev_status = s->status; // RUNNING, most likely
+ ASSERT_L( prev_status == RUNNING); // but check, just in case
s->status = WAITING;
ASSERT_L( s->waiting_on == NULL);
s->waiting_on = &linda->write_happened;
@@ -535,7 +538,7 @@ LUAG_FUNC( linda_receive)
// must trigger error after keeper state has been released
if( pushed < 0)
{
- luaL_error( L, "tried to copy unsupported types");
+ return luaL_error( L, "tried to copy unsupported types");
}
if( cancel)
@@ -583,7 +586,7 @@ LUAG_FUNC( linda_set)
// must trigger error after keeper state has been released
if( pushed < 0)
{
- luaL_error( L, "tried to copy unsupported types");
+ return luaL_error( L, "tried to copy unsupported types");
}
}
@@ -611,7 +614,7 @@ LUAG_FUNC( linda_count)
keeper_release( K);
if( pushed < 0)
{
- luaL_error( L, "tried to count an invalid key");
+ return luaL_error( L, "tried to count an invalid key");
}
}
return pushed;
@@ -645,7 +648,7 @@ LUAG_FUNC( linda_get)
// must trigger error after keeper state has been released
if( pushed < 0)
{
- luaL_error( L, "tried to copy unsupported types");
+ return luaL_error( L, "tried to copy unsupported types");
}
}
@@ -678,7 +681,7 @@ LUAG_FUNC( linda_limit)
// must trigger error after keeper state has been released
if( pushed < 0)
{
- luaL_error( L, "tried to copy unsupported types");
+ return luaL_error( L, "tried to copy unsupported types");
}
}
@@ -1125,11 +1128,9 @@ static int selfdestruct_gc( lua_State *L)
// Tested on MacBook Core Duo 2GHz and 10.5.5:
// -- AKa 25-Oct-2008
//
- #ifndef ATEXIT_WAIT_SECS
- # define ATEXIT_WAIT_SECS (0.25)
- #endif
{
- double t_until= now_secs() + ATEXIT_WAIT_SECS;
+ lua_Number const shutdown_timeout = lua_tonumber( L, lua_upvalueindex( 1));
+ double const t_until = now_secs() + shutdown_timeout;
while( selfdestruct_first != SELFDESTRUCT_END )
{
@@ -1153,7 +1154,7 @@ static int selfdestruct_gc( lua_State *L)
t_now = now_secs();
if( n == 0 || ( t_now >= t_until))
{
- DEBUGEXEC(fprintf( stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, ATEXIT_WAIT_SECS - (t_until - t_now)));
+ DEBUGEXEC(fprintf( stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now)));
break;
}
}
@@ -1201,13 +1202,13 @@ static int selfdestruct_gc( lua_State *L)
THREAD_KILL( &s->thread);
#if THREADAPI == THREADAPI_PTHREAD
// pthread: make sure the thread is really stopped!
- THREAD_WAIT( &s->thread, -1, &s->done_signal_, &s->done_lock_, &s->status);
+ THREAD_WAIT( &s->thread, -1, &s->done_signal, &s->done_lock, &s->status);
#endif // THREADAPI == THREADAPI_PTHREAD
}
// NO lua_close() in this case because we don't know where execution of the state was interrupted
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- SIGNAL_FREE( &s->done_signal_);
- MUTEX_FREE( &s->done_lock_);
+ SIGNAL_FREE( &s->done_signal);
+ MUTEX_FREE( &s->done_lock);
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
free( s);
s = next_s;
@@ -1288,22 +1289,26 @@ LUAG_FUNC( cancel_test)
// Limits the process to use only 'cores' CPU cores. To be used for performance
// testing on multicore devices. DEBUGGING ONLY!
//
-LUAG_FUNC( _single ) {
- uint_t cores= luaG_optunsigned(L,1,1);
+LUAG_FUNC( set_singlethreaded)
+{
+ uint_t cores = luaG_optunsigned( L, 1, 1);
+ (void) cores; // prevent "unused" warning
#ifdef PLATFORM_OSX
- #ifdef _UTILBINDTHREADTOCPU
- if (cores > 1) luaL_error( L, "Limiting to N>1 cores not possible." );
- // requires 'chudInitialize()'
- utilBindThreadToCPU(0); // # of CPU to run on (we cannot limit to 2..N CPUs?)
- #else
- luaL_error( L, "Not available: compile with _UTILBINDTHREADTOCPU" );
- #endif
+#ifdef _UTILBINDTHREADTOCPU
+ if( cores > 1)
+ {
+ return luaL_error( L, "Limiting to N>1 cores not possible");
+ }
+ // requires 'chudInitialize()'
+ utilBindThreadToCPU(0); // # of CPU to run on (we cannot limit to 2..N CPUs?)
#else
- luaL_error( L, "not implemented!" );
+ return luaL_error( L, "Not available: compile with _UTILBINDTHREADTOCPU");
#endif
- (void)cores;
-
+#else
+ return luaL_error( L, "not implemented");
+#endif
+
return 0;
}
@@ -1592,8 +1597,8 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main( void *vs)
s->L = L = 0;
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- SIGNAL_FREE( &s->done_signal_);
- MUTEX_FREE( &s->done_lock_);
+ SIGNAL_FREE( &s->done_signal);
+ MUTEX_FREE( &s->done_lock);
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
free(s);
@@ -1611,14 +1616,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main( void *vs)
// 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change
//
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- MUTEX_LOCK( &s->done_lock_);
+ MUTEX_LOCK( &s->done_lock);
{
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
s->status = st;
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- SIGNAL_ONE( &s->done_signal_); // wake up master (while 's->done_lock' is on)
+ SIGNAL_ONE( &s->done_signal); // wake up master (while 's->done_lock' is on)
}
- MUTEX_UNLOCK( &s->done_lock_);
+ MUTEX_UNLOCK( &s->done_lock);
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
}
return 0; // ignored
@@ -1680,8 +1685,7 @@ LUAG_FUNC( thread_new )
if (prio < THREAD_PRIO_MIN || prio > THREAD_PRIO_MAX)
{
- luaL_error( L, "Priority out of range: %d..+%d (%d)",
- THREAD_PRIO_MIN, THREAD_PRIO_MAX, prio );
+ return luaL_error( L, "Priority out of range: %d..+%d (%d)", THREAD_PRIO_MIN, THREAD_PRIO_MAX, prio);
}
/* --- Create and prepare the sub state --- */
@@ -1689,7 +1693,10 @@ LUAG_FUNC( thread_new )
// populate with selected libraries at the same time
//
L2 = luaG_newstate( libs, on_state_create);
- if (!L2) luaL_error( L, "'luaL_newstate()' failed; out of memory" );
+ if (!L2)
+ {
+ return luaL_error( L, "'luaL_newstate()' failed; out of memory");
+ }
STACK_GROW( L, 2);
STACK_GROW( L2, 3);
@@ -1701,8 +1708,10 @@ LUAG_FUNC( thread_new )
STACK_CHECK(L2)
if( package)
{
- if (lua_type(L,package) != LUA_TTABLE)
- luaL_error( L, "expected package as table, got %s", luaL_typename(L,package));
+ if( lua_type( L, package) != LUA_TTABLE)
+ {
+ return luaL_error( L, "expected package as table, got %s", luaL_typename( L, package));
+ }
lua_getglobal( L2, "package");
if( !lua_isnil( L2, -1)) // package library not loaded: do nothing
{
@@ -1746,13 +1755,15 @@ LUAG_FUNC( thread_new )
int nbRequired = 1;
// should not happen, was checked in lanes.lua before calling thread_new()
if (lua_type(L, required) != LUA_TTABLE)
- luaL_error( L, "expected required module list as a table, got %s", luaL_typename( L, required));
+ {
+ return luaL_error( L, "expected required module list as a table, got %s", luaL_typename( L, required));
+ }
lua_pushnil( L);
while( lua_next( L, required) != 0)
{
if (lua_type(L,-1) != LUA_TSTRING || lua_type(L,-2) != LUA_TNUMBER || lua_tonumber( L, -2) != nbRequired)
{
- luaL_error( L, "required module list should be a list of strings.");
+ return luaL_error( L, "required module list should be a list of strings");
}
else
{
@@ -1772,8 +1783,10 @@ LUAG_FUNC( thread_new )
{
STACK_CHECK(L)
STACK_CHECK(L2)
- if (!lua_istable(L,glob))
- luaL_error( L, "Expected table, got %s", luaL_typename(L,glob));
+ if( !lua_istable( L, glob))
+ {
+ return luaL_error( L, "Expected table, got %s", luaL_typename(L,glob));
+ }
lua_pushnil( L);
lua_pushglobaltable( L2); // Lua 5.2 wants us to push the globals table on the stack
@@ -1799,7 +1812,9 @@ LUAG_FUNC( thread_new )
{
lua_pushvalue( L, 1);
if( luaG_inter_move( L, L2, 1) != 0) // L->L2
- luaL_error( L, "tried to copy unsupported types");
+ {
+ return luaL_error( L, "tried to copy unsupported types");
+ }
STACK_MID(L,0)
}
else if( lua_type(L, 1) == LUA_TSTRING)
@@ -1807,7 +1822,7 @@ LUAG_FUNC( thread_new )
// compile the string
if( luaL_loadstring( L2, lua_tostring( L, 1)) != 0)
{
- luaL_error( L, "error when parsing lane function code");
+ return luaL_error( L, "error when parsing lane function code");
}
}
@@ -1817,7 +1832,9 @@ LUAG_FUNC( thread_new )
// revive arguments
//
if( (args > 0) && (luaG_inter_copy( L, L2, args) != 0)) // L->L2
- luaL_error( L, "tried to copy unsupported types");
+ {
+ return luaL_error( L, "tried to copy unsupported types");
+ }
STACK_MID(L,0)
ASSERT_L( (uint_t)lua_gettop(L2) == 1+args );
@@ -1839,8 +1856,8 @@ LUAG_FUNC( thread_new )
s->cancel_request= FALSE;
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- MUTEX_INIT( &s->done_lock_);
- SIGNAL_INIT( &s->done_signal_);
+ MUTEX_INIT( &s->done_lock);
+ SIGNAL_INIT( &s->done_signal);
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
s->mstatus= NORMAL;
s->selfdestruct_next= NULL;
@@ -1884,50 +1901,44 @@ LUAG_FUNC( thread_new )
// * Why NOT cancel/kill a loose thread:
//
// At least timer system uses a free-running thread, they should be handy
-// and the issue of cancelling/killing threads at gc is not very nice, either
+// and the issue of canceling/killing threads at gc is not very nice, either
// (would easily cause waits at gc cycle, which we don't want).
//
-// * Why YES kill a loose thread:
-//
-// Current way causes segfaults at program exit, if free-running threads are
-// in certain stages. Details are not clear, but this is the core reason.
-// If gc would kill threads then at process exit only one thread would remain.
-//
-// Todo: Maybe we should have a clear #define for selecting either behaviour.
-//
-LUAG_FUNC( thread_gc )
+LUAG_FUNC( thread_gc)
{
- struct s_lane *s= lua_toLane(L,1);
+ struct s_lane* s = lua_toLane( L, 1);
// We can read 's->status' without locks, but not wait for it
- //
- if (s->status < DONE)
- {
- //
- selfdestruct_add(s);
- assert( s->selfdestruct_next );
- return 0;
-
- }
- else if (s->mstatus==KILLED)
+ // test KILLED state first, as it doesn't need to enter the selfdestruct chain
+ if( s->mstatus == KILLED)
{
// Make sure a kill has proceeded, before cleaning up the data structure.
//
// NO lua_close() in this case because we don't know where execution of the state was interrupted
- // If not doing 'THREAD_WAIT()' we should close the Lua state here
- // (can it be out of order, since we killed the lane abruptly?)
- //
-#if 0
- lua_close( s->L );
- s->L = 0;
-#else // 0
DEBUGEXEC(fprintf( stderr, "** Joining with a killed thread (needs testing) **" ));
- THREAD_WAIT( &s->thread, -1, &s->done_signal_, &s->done_lock_, &s->status);
+ // make sure the thread is no longer running, just like thread_join()
+ if(! THREAD_ISNULL( s->thread))
+ THREAD_WAIT( &s->thread, -1, &s->done_signal, &s->done_lock, &s->status);
+ // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing
+ // now, thread_cancel() will not forcefully kill a lane with s->status >= DONE, so I am not sure it can ever happen
+ if( s->status >= DONE && s->L)
+ {
+ lua_close( s->L);
+ s->L = 0;
+ }
DEBUGEXEC(fprintf( stderr, "** Joined ok **" ));
-#endif // 0
+ }
+ else if( s->status < DONE)
+ {
+ // still running: will have to be cleaned up later
+ selfdestruct_add( s);
+ assert( s->selfdestruct_next);
+ return 0;
+
}
else if( s->L)
{
+ // no longer accessing the Lua VM: we can close right now
lua_close( s->L);
s->L = 0;
}
@@ -1935,12 +1946,11 @@ LUAG_FUNC( thread_gc )
// Clean up after a (finished) thread
//
#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
- SIGNAL_FREE( &s->done_signal_);
- MUTEX_FREE( &s->done_lock_);
+ SIGNAL_FREE( &s->done_signal);
+ MUTEX_FREE( &s->done_lock);
#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
free( s);
-
return 0;
}
@@ -1961,17 +1971,29 @@ LUAG_FUNC( thread_gc )
// managed to cancel it.
// false if the cancellation timed out, or a kill was needed.
//
-static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force)
+
+typedef enum
+{
+ CR_Timeout,
+ CR_Cancelled,
+ CR_Killed
+} cancel_result;
+
+static cancel_result thread_cancel( struct s_lane *s, double secs, bool_t force)
{
- bool_t done= TRUE;
+ cancel_result result;
+
+ // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
// We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
- //
- if( s->status < DONE)
+ if( s->mstatus == KILLED)
+ {
+ result = CR_Killed;
+ }
+ else if( s->status < DONE)
{
s->cancel_request = TRUE; // it's now signaled to stop
// signal the linda the wake up the thread so that it can react to the cancel query
// let us hope we never land here with a pointer on a linda that has been destroyed...
- //MUTEX_LOCK( &selfdestruct_cs );
{
SIGNAL_T *waiting_on = s->waiting_on;
if( s->status == WAITING && waiting_on != NULL)
@@ -1979,10 +2001,10 @@ static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force)
SIGNAL_ALL( waiting_on);
}
}
- //MUTEX_UNLOCK( &selfdestruct_cs );
- done = THREAD_WAIT( &s->thread, secs, &s->done_signal_, &s->done_lock_, &s->status);
- if ((!done) && force)
+ result = THREAD_WAIT( &s->thread, secs, &s->done_signal, &s->done_lock, &s->status) ? CR_Cancelled : CR_Timeout;
+
+ if( (result == CR_Timeout) && force)
{
// Killing is asynchronous; we _will_ wait for it to be done at
// GC, to make sure the data structure can be released (alternative
@@ -1990,10 +2012,18 @@ static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force)
// PThread seems to have).
//
THREAD_KILL( &s->thread);
- s->mstatus= KILLED; // mark 'gc' to wait for it
+ s->mstatus = KILLED; // mark 'gc' to wait for it
+ // note that s->status value must remain to whatever it was at the time of the kill
+ // because we need to know if we can lua_close() the Lua State or not.
+ result = CR_Killed;
}
}
- return done;
+ else
+ {
+ // say "ok" by default, including when lane is already done
+ result = CR_Cancelled;
+ }
+ return result;
}
LUAG_FUNC( thread_cancel)
@@ -2004,10 +2034,11 @@ LUAG_FUNC( thread_cancel)
}
else
{
- struct s_lane *s = lua_toLane( L, 1);
+ struct s_lane* s = lua_toLane( L, 1);
double secs = 0.0;
uint_t force_i = 2;
- bool_t force, done= TRUE;
+ cancel_result result;
+ bool_t force;
if( lua_isnumber( L, 2))
{
@@ -2015,15 +2046,32 @@ LUAG_FUNC( thread_cancel)
++ force_i;
}
else if( lua_isnil( L, 2))
+ {
++ force_i;
+ }
force = lua_toboolean( L, force_i); // FALSE if nothing there
- done = thread_cancel( s, secs, force);
-
- lua_pushboolean( L, done);
- return 1;
+ result = thread_cancel( s, secs, force);
+ switch( result)
+ {
+ case CR_Timeout:
+ lua_pushboolean( L, 0);
+ lua_pushstring( L, "timeout");
+ return 2;
+
+ case CR_Cancelled:
+ lua_pushboolean( L, 1);
+ return 1;
+
+ case CR_Killed:
+ lua_pushboolean( L, 0);
+ lua_pushstring( L, "killed");
+ return 2;
+ }
}
+ // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value"
+ return 0;
}
//---
@@ -2039,12 +2087,9 @@ LUAG_FUNC( thread_cancel)
static char const * thread_status_string( struct s_lane *s)
{
enum e_status st = s->status; // read just once (volatile)
- char const * str;
-
- if (s->mstatus == KILLED)
- st= CANCELLED;
-
- str= (st==PENDING) ? "pending" :
+ char const * str =
+ (s->mstatus == KILLED) ? "killed" : // new to v3.3.0!
+ (st==PENDING) ? "pending" :
(st==RUNNING) ? "running" : // like in 'co.status()'
(st==WAITING) ? "waiting" :
(st==DONE) ? "done" :
@@ -2053,12 +2098,13 @@ static char const * thread_status_string( struct s_lane *s)
return str;
}
-static void push_thread_status( lua_State *L, struct s_lane *s)
+static int push_thread_status( lua_State *L, struct s_lane *s)
{
char const * const str = thread_status_string( s);
ASSERT_L( str);
lua_pushstring( L, str );
+ return 1;
}
@@ -2070,15 +2116,15 @@ static void push_thread_status( lua_State *L, struct s_lane *s)
// error: returns nil + error value + stack table
// cancelled: returns nil
//
-LUAG_FUNC( thread_join )
+LUAG_FUNC( thread_join)
{
- struct s_lane *s= lua_toLane(L,1);
+ struct s_lane* const s = lua_toLane( L, 1);
double wait_secs= luaL_optnumber(L,2,-1.0);
lua_State *L2= s->L;
int ret;
bool_t done;
- done = THREAD_ISNULL( s->thread) || THREAD_WAIT( &s->thread, wait_secs, &s->done_signal_, &s->done_lock_, &s->status);
+ done = THREAD_ISNULL( s->thread) || THREAD_WAIT( &s->thread, wait_secs, &s->done_signal, &s->done_lock, &s->status);
if (!done || !L2)
return 0; // timeout: pushes none, leaves 'L2' alive
@@ -2086,34 +2132,49 @@ LUAG_FUNC( thread_join )
STACK_GROW( L, 1);
- switch( s->status)
+ if( s->mstatus == KILLED) // OS thread was killed if thread_cancel was forced
{
- case DONE:
+ // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values
+
+ lua_pushnil( L);
+ lua_pushliteral( L, "killed");
+ ret = 2;
+ }
+ else
+ {
+ switch( s->status)
{
- uint_t n = lua_gettop( L2); // whole L2 stack
- if( (n > 0) && (luaG_inter_move( L2, L, n) != 0))
- luaL_error( L, "tried to copy unsupported types");
- ret = n;
- }
- break;
+ case DONE:
+ {
+ uint_t n = lua_gettop( L2); // whole L2 stack
+ if( (n > 0) && (luaG_inter_move( L2, L, n) != 0))
+ {
+ return luaL_error( L, "tried to copy unsupported types");
+ }
+ ret = n;
+ }
+ break;
- case ERROR_ST:
- lua_pushnil( L);
- if( luaG_inter_move( L2, L, 2) != 0) // error message at [-2], stack trace at [-1]
- luaL_error( L, "tried to copy unsupported types");
- ret= 3;
- break;
-
- case CANCELLED:
- ret= 0;
- break;
-
- default:
- DEBUGEXEC(fprintf( stderr, "Status: %d\n", s->status));
- ASSERT_L( FALSE ); ret= 0;
+ case ERROR_ST:
+ lua_pushnil( L);
+ if( luaG_inter_move( L2, L, 2) != 0) // error message at [-2], stack trace at [-1]
+ {
+ return luaL_error( L, "tried to copy unsupported types");
+ }
+ ret= 3;
+ break;
+
+ case CANCELLED:
+ ret= 0;
+ break;
+
+ default:
+ DEBUGEXEC(fprintf( stderr, "Status: %d\n", s->status));
+ ASSERT_L( FALSE ); ret= 0;
+ }
+ lua_close( L2);
}
- lua_close( L2);
- s->L = L2 = 0;
+ s->L = 0;
return ret;
}
@@ -2131,7 +2192,7 @@ LUAG_FUNC( thread_index)
{
int const UD = 1;
int const KEY = 2;
- int const ENV = 3;
+ int const USR = 3;
struct s_lane *s = lua_toLane( L, UD);
ASSERT_L( lua_gettop( L) == 2);
@@ -2142,10 +2203,10 @@ LUAG_FUNC( thread_index)
{
// first, check that we don't already have an environment that holds the requested value
{
- // If key is found in the environment, return it
+ // If key is found in the uservalue, return it
lua_getuservalue( L, UD);
lua_pushvalue( L, KEY);
- lua_rawget( L, ENV);
+ lua_rawget( L, USR);
if( !lua_isnil( L, -1))
{
return 1;
@@ -2157,27 +2218,40 @@ LUAG_FUNC( thread_index)
bool_t fetched;
lua_Integer key = lua_tointeger( L, KEY);
lua_pushinteger( L, 0);
- lua_rawget( L, ENV);
+ lua_rawget( L, USR);
fetched = !lua_isnil( L, -1);
- lua_pop( L, 1); // back to our 2 args + env on the stack
+ lua_pop( L, 1); // back to our 2 args + uservalue on the stack
if( !fetched)
{
lua_pushinteger( L, 0);
lua_pushboolean( L, 1);
- lua_rawset( L, ENV);
+ lua_rawset( L, USR);
// wait until thread has completed
lua_pushcfunction( L, LG_thread_join);
lua_pushvalue( L, UD);
lua_call( L, 1, LUA_MULTRET); // all return values are on the stack, at slots 4+
switch( s->status)
{
+ default:
+ if( s->mstatus != KILLED)
+ {
+ // this is an internal error, we probably never get here
+ lua_settop( L, 0);
+ lua_pushliteral( L, "Unexpected status: ");
+ lua_pushstring( L, thread_status_string( s));
+ lua_concat( L, 2);
+ lua_error( L);
+ break;
+ }
+ // fall through if we are killed, as we got nil, "killed" on the stack
+
case DONE: // got regular return values
{
int i, nvalues = lua_gettop( L) - 3;
for( i = nvalues; i > 0; -- i)
{
- // pop the last element of the stack, to store it in the environment at its proper index
- lua_rawseti( L, ENV, i);
+ // pop the last element of the stack, to store it in the uservalue at its proper index
+ lua_rawseti( L, USR, i);
}
}
break;
@@ -2190,28 +2264,19 @@ LUAG_FUNC( thread_index)
// store errstring at key -1
lua_pushnumber( L, -1);
lua_pushvalue( L, 5);
- lua_rawset( L, ENV);
+ lua_rawset( L, USR);
break;
case CANCELLED:
// do nothing
break;
-
- default:
- // this is an internal error, we probably never get here
- lua_settop( L, 0);
- lua_pushliteral( L, "Unexpected status: ");
- lua_pushstring( L, thread_status_string( s));
- lua_concat( L, 2);
- lua_error( L);
- break;
}
}
lua_settop( L, 3); // UD KEY ENV
if( key != -1)
{
lua_pushnumber( L, -1); // UD KEY ENV -1
- lua_rawget( L, ENV); // UD KEY ENV "error"
+ lua_rawget( L, USR); // UD KEY ENV "error"
if( !lua_isnil( L, -1)) // an error was stored
{
// Note: Lua 5.1 interpreter is not prepared to show
@@ -2238,7 +2303,7 @@ LUAG_FUNC( thread_index)
lua_pop( L, 1); // back to our 3 arguments on the stack
}
}
- lua_rawgeti( L, ENV, (int)key);
+ lua_rawgeti( L, USR, (int)key);
}
return 1;
}
@@ -2248,17 +2313,18 @@ LUAG_FUNC( thread_index)
lua_settop( L, 2); // keep only our original arguments on the stack
if( strcmp( keystr, "status") == 0)
{
- push_thread_status( L, s); // push the string representing the status
+ return push_thread_status( L, s); // push the string representing the status
}
- else if( strcmp( keystr, "cancel") == 0 || strcmp( keystr, "join") == 0)
+ // return UD.metatable[key]
+ lua_getmetatable( L, UD); // UD KEY mt
+ lua_replace( L, -3); // mt KEY
+ lua_rawget( L, -2); // mt value
+ // only "cancel" and "join" are registered as functions, any other string will raise an error
+ if( lua_iscfunction( L, -1))
{
- // return UD.metatable[key] (should be a function in both cases)
- lua_getmetatable( L, UD); // UD KEY mt
- lua_replace( L, -3); // mt KEY
- lua_rawget( L, -2); // mt value
- ASSERT_L( lua_iscfunction( L, -1));
+ return 1;
}
- return 1;
+ return luaL_error( L, "can't index a lane with '%s'", keystr);
}
// unknown key
lua_getmetatable( L, UD);
@@ -2338,14 +2404,14 @@ static const struct luaL_Reg lanes_functions [] = {
{"now_secs", LG_now_secs},
{"wakeup_conv", LG_wakeup_conv},
{"nameof", luaG_nameof},
- {"_single", LG__single},
+ {"set_singlethreaded", LG_set_singlethreaded},
{NULL, NULL}
};
/*
* One-time initializations
*/
-static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_ref, int const nbKeepers, lua_CFunction _on_state_create)
+static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_ref, int const nbKeepers, lua_CFunction _on_state_create, lua_Number _shutdown_timeout)
{
const char *err;
@@ -2398,7 +2464,7 @@ static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_r
err = init_keepers( nbKeepers, _on_state_create);
if (err)
{
- luaL_error( L, "Unable to initialize: %s", err );
+ (void) luaL_error( L, "Unable to initialize: %s", err );
}
// Initialize 'timer_deep'; a common Linda object shared by all states
@@ -2428,7 +2494,8 @@ static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_r
{
lua_newuserdata( L, 1);
lua_newtable( L);
- lua_pushcfunction( L, selfdestruct_gc);
+ lua_pushnumber( L, _shutdown_timeout);
+ lua_pushcclosure( L, selfdestruct_gc, 1);
lua_setfield( L, -2, "__gc");
lua_pushliteral( L, "AtExit");
lua_setfield( L, -2, "__metatable");
@@ -2446,10 +2513,10 @@ static volatile long s_initCount = 0;
LUAG_FUNC( configure )
{
char const* name = luaL_checkstring( L, lua_upvalueindex( 1));
- int const nbKeepers = luaL_optint( L, 1, 1);
+ // all parameter checks are done lua-side
+ int const nbKeepers = (int)lua_tointeger( L, 1);
lua_CFunction on_state_create = lua_iscfunction( L, 2) ? lua_tocfunction( L, 2) : NULL;
- luaL_argcheck( L, nbKeepers > 0, 1, "Number of keeper states must be > 0");
- luaL_argcheck( L, lua_iscfunction( L, 2) || lua_isnil( L, 2), 2, "on_state_create should be a C function");
+ lua_Number shutdown_timeout = lua_tonumber( L, 3);
/*
* Making one-time initializations.
*
@@ -2462,7 +2529,7 @@ LUAG_FUNC( configure )
static volatile int /*bool*/ go_ahead; // = 0
if( InterlockedCompareExchange( &s_initCount, 1, 0) == 0)
{
- init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create);
+ init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create, shutdown_timeout);
go_ahead= 1; // let others pass
}
else
@@ -2480,7 +2547,7 @@ LUAG_FUNC( configure )
//
if( s_initCount == 0)
{
- init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create);
+ init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create, shutdown_timeout);
s_initCount = 1;
}
}
@@ -2525,7 +2592,7 @@ LUAG_FUNC( configure )
lua_setfield(L, -2, "timer_gateway");
lua_pushstring(L, VERSION);
- lua_setfield(L, -2, "_version");
+ lua_setfield(L, -2, "version");
lua_pushinteger(L, THREAD_PRIO_MAX);
lua_setfield(L, -2, "max_prio");
diff --git a/src/lanes.lua b/src/lanes.lua
index e6400df..c9ab07d 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -40,55 +40,93 @@ THE SOFTWARE.
]]--
-- Lua 5.1: module() creates a global variable
--- Lua 5.2: module() might go away
--- almost everything module() does is done by require()
+-- Lua 5.2: module() is gone
+-- almost everything module() does is done by require() anyway
-- -> simply create a table, populate it, return it, and be done
local lanes = {}
lanes.configure = function( _params)
-_params = _params or { nb_keepers = 1, with_timers = true, on_state_create = nil}
-if type( _params) ~= "table" then
- error( "Bad parameter #1 to lanes.configure(), should be a table")
-end
--- on_state_create may be nil or a function
-if _params.on_state_create and (type( _params.on_state_create) ~= "function") then
- error( "Bad on_state_create: " .. tostring( _params.on_state_create), 2)
-end
-local mm = require "lanes.core"
-assert( type(mm)=="table" )
+ -- This check is for sublanes requiring Lanes
+ --
+ -- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler.
+ --
+ if not string then
+ error( "To use 'lanes', you will also need to have 'string' available.", 2)
+ end
--- configure() is available only the first time lanes.core is required process-wide, and we *must* call it to have the other functions in the interface
-if mm.configure then mm.configure( _params.nb_keepers, _params.on_state_create) end
+ --
+ -- Cache globals for code that might run under sandboxing
+ --
+ local assert = assert
+ local string_gmatch = assert( string.gmatch)
+ local select = assert( select)
+ local type = assert( type)
+ local pairs = assert( pairs)
+ local tostring = assert( tostring)
+ local error = assert( error)
+
+ local default_params = { nb_keepers = 1, on_state_create = nil, shutdown_timeout = 0.25, with_timers = true}
+ local param_checkers =
+ {
+ nb_keepers = function( _val)
+ -- nb_keepers should be a number > 0
+ return type( _val) == "number" and _val > 0
+ end,
+ with_timers = function( _val)
+ -- with_timers may be nil or boolean
+ return _val and type( _val) == "boolean" or true
+ end,
+ on_state_create = function( _val)
+ -- on_state_create may be nil or a function
+ return _val and type( _val) == "function" or true
+ end,
+ shutdown_timeout = function( _val)
+ -- nb_keepers should be a number >= 0
+ return type( _val) == "number" and _val >= 0
+ end
+ }
-local thread_new = assert(mm.thread_new)
+ local params_checker = function( _params)
+ if not _params then
+ return default_params
+ end
+ if type( _params) ~= "table" then
+ error( "Bad parameter #1 to lanes.configure(), should be a table")
+ end
+ -- any setting not present in the provided parameters takes the default value
+ for key, value in pairs( default_params) do
+ local my_param = _params[key]
+ local param
+ if my_param ~= nil then
+ param = my_param
+ else
+ param = default_params[key]
+ end
+ if not param_checkers[key]( param) then
+ error( "Bad " .. key .. ": " .. tostring( param), 2)
+ end
+ _params[key] = param
+ end
+ return _params
+ end
-local _single= assert(mm._single)
-local _version= assert(mm._version)
+ _params = params_checker( _params)
-local now_secs= assert( mm.now_secs )
-local wakeup_conv= assert( mm.wakeup_conv )
+ local core = require "lanes.core"
+ assert( type( core)=="table")
-local max_prio= assert( mm.max_prio )
+ -- configure() is available only the first time lanes.core is required process-wide, and we *must* call it to have the other functions in the interface
+ if core.configure then core.configure( _params.nb_keepers, _params.on_state_create, _params.shutdown_timeout) end
--- This check is for sublanes requiring Lanes
---
--- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler.
---
-if not string then
- error( "To use 'lanes', you will also need to have 'string' available.", 2 )
-end
+ local thread_new = assert( core.thread_new)
---
--- Cache globals for code that might run under sandboxing
---
-local assert= assert
-local string_gmatch= assert( string.gmatch )
-local select= assert( select )
-local type= assert( type )
-local pairs= assert( pairs )
-local tostring= assert( tostring )
-local error= assert( error )
+ local set_singlethreaded = assert( core.set_singlethreaded)
+
+ local now_secs = assert( core.now_secs)
+ local wakeup_conv = assert( core.wakeup_conv)
+
+ local max_prio = assert( core.max_prio)
lanes.ABOUT=
{
@@ -96,7 +134,7 @@ lanes.ABOUT=
description= "Running multiple Lua states in parallel",
license= "MIT/X11",
copyright= "Copyright (c) 2007-10, Asko Kauppi; (c) 2011-12, Benoit Germain",
- version= _version,
+ version = assert( core.version)
}
@@ -258,7 +296,7 @@ end
-- lanes.linda(["name"]) -> linda_ud
--
-- PUBLIC LANES API
-local linda = mm.linda
+local linda = core.linda
---=== Timers ===---
@@ -268,7 +306,7 @@ local timer = function() error "timers are not active" end
if _params.with_timers ~= false then
-local timer_gateway= assert( mm.timer_gateway )
+local timer_gateway = assert( core.timer_gateway)
--
-- On first 'require "lanes"', a timer lane is spawned that will maintain
-- timer tables and sleep in between the timer events. All interaction with
@@ -558,28 +596,23 @@ local function genatomic( linda, key, initial_val )
end
end
--- newuserdata = mm.newuserdata
-
-- activate full interface
lanes.gen = gen
- lanes.linda = mm.linda
- lanes.cancel_error = mm.cancel_error
- lanes.nameof = mm.nameof
+ lanes.linda = core.linda
+ lanes.cancel_error = core.cancel_error
+ lanes.nameof = core.nameof
lanes.timer = timer
lanes.genlock = genlock
lanes.now_secs = now_secs
lanes.genatomic = genatomic
-- from now on, calling configure does nothing but checking that we don't call it with parameters that changed compared to the first invocation
lanes.configure = function( _params2)
- _params2 = _params2 or _params
- if _params2.nb_keepers ~= _params.nb_keepers then
- error( "mismatched configuration: " .. tostring( _params2.nb_keepers) .. " keepers instead of " .. tostring( _params.nb_keepers))
- end
- if _params2.with_timers ~= _params.with_timers then
- error( "mismatched configuration: " .. tostring( _params2.with_timers) .. " timer activity instead of " .. tostring( _params.with_timers))
- end
- if _params2.on_create_state and _params2.on_create_state ~= _params.on_create_state then
- error( "mismatched configuration: " .. tostring( _params2.on_create_state) .. " timer activity instead of " .. tostring( _params.on_create_state))
+ _params2 = params_checker( _params2 or _params)
+ for key, value2 in pairs( _params2) do
+ local value = _params[key]
+ if value2 ~= value then
+ error( "mismatched configuration: " .. key .. " is " .. tostring( value2) .. " instead of " .. tostring( value))
+ end
end
return lanes
end
@@ -588,4 +621,3 @@ end -- lanes.configure
--the end
return lanes
-
diff --git a/src/threading.h b/src/threading.h
index b0a3db0..f4f1ada 100644
--- a/src/threading.h
+++ b/src/threading.h
@@ -35,6 +35,9 @@ typedef unsigned int uint_t;
#include
/* Note: ERROR is a defined entity on Win32
+ PENDING: The Lua VM hasn't done anything yet.
+ RUNNING, WAITING: Thread is inside the Lua VM. If the thread is forcefully stopped, we can't lua_close() the Lua State.
+ DONE, ERROR_ST, CANCELLED: Thread execution is outside the Lua VM. It can be lua_close()d.
*/
enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
diff --git a/tests/fibonacci.lua b/tests/fibonacci.lua
index 452a770..48cd8d7 100644
--- a/tests/fibonacci.lua
+++ b/tests/fibonacci.lua
@@ -4,7 +4,7 @@
-- Parallel calculation of Fibonacci numbers
--
-- A sample of task splitting like Intel TBB library does.
---
+--
-- References:
-- Intel Threading Building Blocks, 'test all'
--
@@ -41,7 +41,7 @@ local function fib( n )
--
-- note that lanes is pulled in as upvalue, so we need package library to require internals properly
-- (because lua51-lanes is always required internally if possible, which is necessary in that case)
- local gen_f= lanes.gen( "package,string,io,math,debug", fib )
+ local gen_f= lanes.gen( "*", fib )
local n1=floor(n/2) +1
local n2=floor(n/2) -1 + n%2
@@ -60,7 +60,7 @@ local function fib( n )
end
io.stderr:write( "fib("..n..") = "..sum.."\n" )
-
+
return sum
end
diff --git a/tests/finalizer.lua b/tests/finalizer.lua
index 6f186ab..dc9ed34 100644
--- a/tests/finalizer.lua
+++ b/tests/finalizer.lua
@@ -31,6 +31,7 @@ local function lane()
io.stderr:write( "File "..FN.." created\n" )
if which==0 then
+ print "you loose"
error("aa") -- exception here; the value needs NOT be a string
end
@@ -55,6 +56,7 @@ cleanup= function(err)
end
local _,err2= os.remove(FN)
+ print( "file removal result: ", tostring( err2))
assert(not err2) -- if this fails, it will be shown in the calling script
-- as an error from the lane itself
--
cgit v1.2.3-55-g6feb
|
|