aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Germain <bnt.germain@gmail.com>2019-04-19 14:58:25 +0200
committerBenoit Germain <bnt.germain@gmail.com>2019-04-19 14:58:25 +0200
commitbe1e9d37d9809ee55f26d811208fa64ea9b3785a (patch)
tree1b0122ed7e7f501c64814cdbbf03e5941dbab51c
parent52934d46eed850c23a9b21125be73e987f34e772 (diff)
downloadlanes-be1e9d37d9809ee55f26d811208fa64ea9b3785a.tar.gz
lanes-be1e9d37d9809ee55f26d811208fa64ea9b3785a.tar.bz2
lanes-be1e9d37d9809ee55f26d811208fa64ea9b3785a.zip
lane:cancel internal code refactorization
Diffstat (limited to '')
-rw-r--r--src/lanes.c151
-rw-r--r--src/linda.c4
-rw-r--r--tests/basic.lua112
3 files changed, 130 insertions, 137 deletions
diff --git a/src/lanes.c b/src/lanes.c
index 90da9bf..abd4171 100644
--- a/src/lanes.c
+++ b/src/lanes.c
@@ -169,20 +169,6 @@ static DECLARE_CONST_UNIQUE_KEY( FINALIZER_REGKEY, 0x188fccb8bf348e09);
169 169
170struct s_Linda; 170struct s_Linda;
171 171
172#if 1
173# define DEBUG_SIGNAL( msg, signal_ref ) /* */
174#else
175# define DEBUG_SIGNAL( msg, signal_ref ) \
176 { int i; unsigned char *ptr; char buf[999]; \
177 sprintf( buf, ">>> " msg ": %p\t", (signal_ref) ); \
178 ptr= (unsigned char *)signal_ref; \
179 for( i=0; i<sizeof(*signal_ref); i++ ) { \
180 sprintf( strchr(buf,'\0'), "%02x %c ", ptr[i], ptr[i] ); \
181 } \
182 fprintf( stderr, "%s\n", buf ); \
183 }
184#endif
185
186/* 172/*
187* Push a table stored in registry onto Lua stack. 173* Push a table stored in registry onto Lua stack.
188* 174*
@@ -442,90 +428,97 @@ typedef enum
442 CR_Killed 428 CR_Killed
443} cancel_result; 429} cancel_result;
444 430
445static cancel_result thread_cancel( lua_State* L, Lane* s, double secs, bool_t force, double waitkill_timeout_) 431static cancel_result thread_cancel_soft( lua_State* L, Lane* s, bool_t wake_lindas_)
446{ 432{
447 cancel_result result; 433 s->cancel_request = CANCEL_SOFT; // it's now signaled to stop
448 434 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own
449 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here 435 if( wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired
450 // We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
451 if( s->mstatus == KILLED)
452 { 436 {
453 result = CR_Killed; 437 SIGNAL_T *waiting_on = s->waiting_on;
438 if( s->status == WAITING && waiting_on != NULL)
439 {
440 SIGNAL_ALL( waiting_on);
441 }
454 } 442 }
455 else if( s->status < DONE) 443 // say we succeeded though
444 return CR_Cancelled;
445}
446
447static cancel_result thread_cancel_hard( lua_State* L, Lane* s, double secs_, bool_t force_, double waitkill_timeout_)
448{
449 cancel_result result;
450
451 s->cancel_request = CANCEL_HARD; // it's now signaled to stop
456 { 452 {
457 // signal the linda the wake up the thread so that it can react to the cancel query 453 SIGNAL_T *waiting_on = s->waiting_on;
458 // let us hope we never land here with a pointer on a linda that has been destroyed... 454 if( s->status == WAITING && waiting_on != NULL)
459 if( secs < 0.0)
460 { 455 {
461 s->cancel_request = CANCEL_SOFT; // it's now signaled to stop 456 SIGNAL_ALL( waiting_on);
462 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own
463 if( force) // wake the thread so that execution returns from any pending linda operation if desired
464 {
465 SIGNAL_T *waiting_on = s->waiting_on;
466 if( s->status == WAITING && waiting_on != NULL)
467 {
468 SIGNAL_ALL( waiting_on);
469 }
470 }
471 // say we succeeded though
472 result = CR_Cancelled;
473 } 457 }
474 else 458 }
475 {
476 s->cancel_request = CANCEL_HARD; // it's now signaled to stop
477 {
478 SIGNAL_T *waiting_on = s->waiting_on;
479 if( s->status == WAITING && waiting_on != NULL)
480 {
481 SIGNAL_ALL( waiting_on);
482 }
483 }
484 459
485 result = THREAD_WAIT( &s->thread, secs, &s->done_signal, &s->done_lock, &s->status) ? CR_Cancelled : CR_Timeout; 460 result = THREAD_WAIT( &s->thread, secs_, &s->done_signal, &s->done_lock, &s->status) ? CR_Cancelled : CR_Timeout;
486 461
487 if( (result == CR_Timeout) && force) 462 if( (result == CR_Timeout) && force_)
488 { 463 {
489 // Killing is asynchronous; we _will_ wait for it to be done at 464 // Killing is asynchronous; we _will_ wait for it to be done at
490 // GC, to make sure the data structure can be released (alternative 465 // GC, to make sure the data structure can be released (alternative
491 // would be use of "cancellation cleanup handlers" that at least 466 // would be use of "cancellation cleanup handlers" that at least
492 // PThread seems to have). 467 // PThread seems to have).
493 // 468 //
494 THREAD_KILL( &s->thread); 469 THREAD_KILL( &s->thread);
495#if THREADAPI == THREADAPI_PTHREAD 470#if THREADAPI == THREADAPI_PTHREAD
496 // pthread: make sure the thread is really stopped! 471 // pthread: make sure the thread is really stopped!
497 // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS 472 // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS
498 result = THREAD_WAIT( &s->thread, waitkill_timeout_, &s->done_signal, &s->done_lock, &s->status); 473 result = THREAD_WAIT( &s->thread, waitkill_timeout_, &s->done_signal, &s->done_lock, &s->status);
499 if( result == CR_Timeout) 474 if( result == CR_Timeout)
500 { 475 {
501 return luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : ""); 476 return luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : "");
502 } 477 }
503#else 478#else
504 (void) waitkill_timeout_; // unused 479 (void) waitkill_timeout_; // unused
505 (void) L; // unused 480 (void) L; // unused
506#endif // THREADAPI == THREADAPI_PTHREAD 481#endif // THREADAPI == THREADAPI_PTHREAD
507 s->mstatus = KILLED; // mark 'gc' to wait for it 482 s->mstatus = KILLED; // mark 'gc' to wait for it
508 // note that s->status value must remain to whatever it was at the time of the kill 483 // note that s->status value must remain to whatever it was at the time of the kill
509 // because we need to know if we can lua_close() the Lua State or not. 484 // because we need to know if we can lua_close() the Lua State or not.
510 result = CR_Killed; 485 result = CR_Killed;
511 }
512 }
513 } 486 }
514 else 487 return result;
488}
489
490static cancel_result thread_cancel( lua_State* L, Lane* s, double secs_, bool_t force_, double waitkill_timeout_)
491{
492 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
493 // We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
494 if( s->mstatus == KILLED)
495 {
496 return CR_Killed;
497 }
498
499 if( s->status >= DONE)
515 { 500 {
516 // say "ok" by default, including when lane is already done 501 // say "ok" by default, including when lane is already done
517 result = CR_Cancelled; 502 return CR_Cancelled;
518 } 503 }
519 return result; 504
505 // signal the linda the wake up the thread so that it can react to the cancel query
506 // let us hope we never land here with a pointer on a linda that has been destroyed...
507 if( secs_ < 0.0)
508 {
509 return thread_cancel_soft( L, s, force_);
510 }
511
512 return thread_cancel_hard( L, s, secs_, force_, waitkill_timeout_);
520} 513}
521 514
522 // 515//
523 // Protects modifying the selfdestruct chain 516// Protects modifying the selfdestruct chain
524 517
525#define SELFDESTRUCT_END ((Lane*)(-1)) 518#define SELFDESTRUCT_END ((Lane*)(-1))
526 // 519//
527 // The chain is ended by '(Lane*)(-1)', not NULL: 520// The chain is ended by '(Lane*)(-1)', not NULL:
528 // 'selfdestruct_first -> ... -> ... -> (-1)' 521// 'selfdestruct_first -> ... -> ... -> (-1)'
529 522
530/* 523/*
531 * Add the lane to selfdestruct chain; the ones still running at the end of the 524 * Add the lane to selfdestruct chain; the ones still running at the end of the
diff --git a/src/linda.c b/src/linda.c
index 69f41b6..150649d 100644
--- a/src/linda.c
+++ b/src/linda.c
@@ -245,7 +245,7 @@ LUAG_FUNC( linda_send)
245 { 245 {
246 case CANCEL_SOFT: 246 case CANCEL_SOFT:
247 // if user wants to soft-cancel, the call returns lanes.cancel_error 247 // if user wants to soft-cancel, the call returns lanes.cancel_error
248 push_unique_key( L, CANCEL_ERROR); 248 push_unique_key( L, CANCEL_ERROR);
249 return 1; 249 return 1;
250 250
251 case CANCEL_HARD: 251 case CANCEL_HARD:
@@ -400,7 +400,7 @@ LUAG_FUNC( linda_receive)
400 { 400 {
401 case CANCEL_SOFT: 401 case CANCEL_SOFT:
402 // if user wants to soft-cancel, the call returns CANCEL_ERROR 402 // if user wants to soft-cancel, the call returns CANCEL_ERROR
403 push_unique_key( L, CANCEL_ERROR); 403 push_unique_key( L, CANCEL_ERROR);
404 return 1; 404 return 1;
405 405
406 case CANCEL_HARD: 406 case CANCEL_HARD:
diff --git a/tests/basic.lua b/tests/basic.lua
index 7d42ad5..020fe78 100644
--- a/tests/basic.lua
+++ b/tests/basic.lua
@@ -26,7 +26,7 @@ local function PRINT(...)
26end 26end
27 27
28local gc_cb = function( name_, status_) 28local gc_cb = function( name_, status_)
29 PRINT( " ---> lane '" .. name_ .. "' collected with status " .. status_) 29 PRINT( " ---> lane '" .. name_ .. "' collected with status " .. status_)
30end 30end
31--gc_cb = nil 31--gc_cb = nil
32 32
@@ -77,7 +77,7 @@ local function task( a, b, c )
77end 77end
78 78
79local task_launch= lanes_gen( "", { globals={hey=true}, gc_cb = gc_cb}, task ) 79local task_launch= lanes_gen( "", { globals={hey=true}, gc_cb = gc_cb}, task )
80 -- base stdlibs, normal priority 80 -- base stdlibs, normal priority
81 81
82-- 'task_launch' is a factory of multithreaded tasks, we can launch several: 82-- 'task_launch' is a factory of multithreaded tasks, we can launch several:
83 83
@@ -155,12 +155,12 @@ limited:limit( "key", 1)
155-- [[################################################ 155-- [[################################################
156limited:send( "key", "hello") -- saturate linda 156limited:send( "key", "hello") -- saturate linda
157for k, v in pairs( limited:dump()) do 157for k, v in pairs( limited:dump()) do
158 PRINT("limited[" .. tostring( k) .. "] = " .. tostring( v)) 158 PRINT("limited[" .. tostring( k) .. "] = " .. tostring( v))
159end 159end
160local wait_send = function() 160local wait_send = function()
161 local a,b 161 local a,b
162 set_finalizer( function() print( "wait_send", a, b) end) 162 set_finalizer( function() print( "wait_send", a, b) end)
163 a,b = limited:send( "key", "bybye") -- infinite timeout, returns only when lane is cancelled 163 a,b = limited:send( "key", "bybye") -- infinite timeout, returns only when lane is cancelled
164end 164end
165 165
166local wait_send_lane = lanes.gen( "*", wait_send)() 166local wait_send_lane = lanes.gen( "*", wait_send)()
@@ -171,9 +171,9 @@ repeat until wait_send_lane.status == "cancelled"
171print "wait_send_lane is cancelled" 171print "wait_send_lane is cancelled"
172--################################################]] 172--################################################]]
173local wait_receive = function() 173local wait_receive = function()
174 local k, v 174 local k, v
175 set_finalizer( function() print( "wait_receive", k, v) end) 175 set_finalizer( function() print( "wait_receive", k, v) end)
176 k, v = limited:receive( "dummy") -- infinite timeout, returns only when lane is cancelled 176 k, v = limited:receive( "dummy") -- infinite timeout, returns only when lane is cancelled
177end 177end
178 178
179local wait_receive_lane = lanes.gen( "*", wait_receive)() 179local wait_receive_lane = lanes.gen( "*", wait_receive)()
@@ -184,9 +184,9 @@ repeat until wait_receive_lane.status == "cancelled"
184print "wait_receive_lane is cancelled" 184print "wait_receive_lane is cancelled"
185--################################################]] 185--################################################]]
186local wait_receive_batched = function() 186local wait_receive_batched = function()
187 local k, v1, v2 187 local k, v1, v2
188 set_finalizer( function() print( "wait_receive_batched", k, v1, v2) end) 188 set_finalizer( function() print( "wait_receive_batched", k, v1, v2) end)
189 k, v1, v2 = limited:receive( limited.batched, "dummy", 2) -- infinite timeout, returns only when lane is cancelled 189 k, v1, v2 = limited:receive( limited.batched, "dummy", 2) -- infinite timeout, returns only when lane is cancelled
190end 190end
191 191
192local wait_receive_batched_lane = lanes.gen( "*", wait_receive_batched)() 192local wait_receive_batched_lane = lanes.gen( "*", wait_receive_batched)()
@@ -206,7 +206,7 @@ PRINT( "\n\n", "---=== Communications ===---", "\n\n")
206local function WR(...) io.stderr:write(...) end 206local function WR(...) io.stderr:write(...) end
207 207
208local chunk= function( linda ) 208local chunk= function( linda )
209 set_debug_threadname "chunk" 209 set_debug_threadname "chunk"
210 local function receive() return linda:receive( "->" ) end 210 local function receive() return linda:receive( "->" ) end
211 local function send(...) linda:send( "<-", ... ) end 211 local function send(...) linda:send( "<-", ... ) end
212 212
@@ -254,9 +254,9 @@ local a,b,c= RECEIVE(), RECEIVE(), RECEIVE()
254 254
255print( "lane status: " .. t.status) 255print( "lane status: " .. t.status)
256if t.status == "error" then 256if t.status == "error" then
257 print( t:join()) 257 print( t:join())
258else 258else
259 WR( a..", "..b..", "..c.." received\n" ) 259 WR( a..", "..b..", "..c.." received\n" )
260end 260end
261 261
262assert( a==1 and b==2 and c==3 ) 262assert( a==1 and b==2 and c==3 )
@@ -286,50 +286,50 @@ linda: receive( 1, "wait")
286PRINT( "\n\n", "---=== Stdlib naming ===---", "\n\n") 286PRINT( "\n\n", "---=== Stdlib naming ===---", "\n\n")
287 287
288local function dump_g( _x) 288local function dump_g( _x)
289 set_debug_threadname "dump_g" 289 set_debug_threadname "dump_g"
290 assert(print) 290 assert(print)
291 print( "### dumping _G for '" .. _x .. "'") 291 print( "### dumping _G for '" .. _x .. "'")
292 for k, v in pairs( _G) do 292 for k, v in pairs( _G) do
293 print( "\t" .. k .. ": " .. type( v)) 293 print( "\t" .. k .. ": " .. type( v))
294 end 294 end
295 return true 295 return true
296end 296end
297 297
298local function io_os_f( _x) 298local function io_os_f( _x)
299 set_debug_threadname "io_os_f" 299 set_debug_threadname "io_os_f"
300 assert(print) 300 assert(print)
301 print( "### checking io and os libs existence for '" .. _x .. "'") 301 print( "### checking io and os libs existence for '" .. _x .. "'")
302 assert(io) 302 assert(io)
303 assert(os) 303 assert(os)
304 return true 304 return true
305end 305end
306 306
307local function coro_f( _x) 307local function coro_f( _x)
308 set_debug_threadname "coro_f" 308 set_debug_threadname "coro_f"
309 assert(print) 309 assert(print)
310 print( "### checking coroutine lib existence for '" .. _x .. "'") 310 print( "### checking coroutine lib existence for '" .. _x .. "'")
311 assert(coroutine) 311 assert(coroutine)
312 return true 312 return true
313end 313end
314 314
315assert.fails( function() lanes_gen( "xxx", {gc_cb = gc_cb}, io_os_f ) end ) 315assert.fails( function() lanes_gen( "xxx", {gc_cb = gc_cb}, io_os_f ) end )
316 316
317local stdlib_naming_tests = 317local stdlib_naming_tests =
318{ 318{
319 -- { "", dump_g}, 319 -- { "", dump_g},
320 -- { "coroutine", dump_g}, 320 -- { "coroutine", dump_g},
321 -- { "io", dump_g}, 321 -- { "io", dump_g},
322 -- { "bit32", dump_g}, 322 -- { "bit32", dump_g},
323 { "coroutine", coro_f}, 323 { "coroutine", coro_f},
324 { "*", io_os_f}, 324 { "*", io_os_f},
325 { "io,os", io_os_f}, 325 { "io,os", io_os_f},
326 { "io+os", io_os_f}, 326 { "io+os", io_os_f},
327 { "io,os,base", io_os_f}, 327 { "io,os,base", io_os_f},
328} 328}
329 329
330for _, t in ipairs( stdlib_naming_tests) do 330for _, t in ipairs( stdlib_naming_tests) do
331 local f= lanes_gen( t[1], {gc_cb = gc_cb}, t[2]) -- any delimiter will do 331 local f= lanes_gen( t[1], {gc_cb = gc_cb}, t[2]) -- any delimiter will do
332 assert( f(t[1])[1] ) 332 assert( f(t[1])[1] )
333end 333end
334 334
335collectgarbage() 335collectgarbage()
@@ -344,7 +344,7 @@ PRINT( "\n\n", "---=== Comms criss cross ===---", "\n\n")
344-- 344--
345local tc= lanes_gen( "io", {gc_cb = gc_cb}, 345local tc= lanes_gen( "io", {gc_cb = gc_cb},
346 function( linda, ch_in, ch_out ) 346 function( linda, ch_in, ch_out )
347 set_debug_threadname( "criss cross " .. ch_in .. " -> " .. ch_out) 347 set_debug_threadname( "criss cross " .. ch_in .. " -> " .. ch_out)
348 local function STAGE(str) 348 local function STAGE(str)
349 io.stderr:write( ch_in..": "..str.."\n" ) 349 io.stderr:write( ch_in..": "..str.."\n" )
350 linda:send( nil, ch_out, str ) 350 linda:send( nil, ch_out, str )
@@ -412,7 +412,7 @@ linda:send( "down", function(linda) linda:send( "up", "ready!" ) end,
412-- 412--
413local k,s= linda:receive( 1, "up" ) 413local k,s= linda:receive( 1, "up" )
414if t2.status == "error" then 414if t2.status == "error" then
415 print( "t2 error: " , t2:join()) 415 print( "t2 error: " , t2:join())
416end 416end
417PRINT(s) 417PRINT(s)
418assert( s=="ready!" ) 418assert( s=="ready!" )
@@ -440,13 +440,13 @@ PRINT( "\n\n", "---=== :join test ===---", "\n\n")
440 440
441local S= lanes_gen( "table", {gc_cb = gc_cb}, 441local S= lanes_gen( "table", {gc_cb = gc_cb},
442 function(arg) 442 function(arg)
443 set_debug_threadname "join test lane" 443 set_debug_threadname "join test lane"
444 set_finalizer( function() end) 444 set_finalizer( function() end)
445 aux= {} 445 aux= {}
446 for i, v in ipairs(arg) do 446 for i, v in ipairs(arg) do
447 table.insert (aux, 1, v) 447 table.insert (aux, 1, v)
448 end 448 end
449 -- unpack was renamed table.unpack in Lua 5.2: cater for both! 449 -- unpack was renamed table.unpack in Lua 5.2: cater for both!
450 return (unpack or table.unpack)(aux) 450 return (unpack or table.unpack)(aux)
451end ) 451end )
452 452
@@ -456,13 +456,13 @@ linda:receive(0.5, "gloupti")
456print( "joining with '" .. h:get_debug_threadname() .. "'") 456print( "joining with '" .. h:get_debug_threadname() .. "'")
457local a,b,c,d= h:join() 457local a,b,c,d= h:join()
458if h.status == "error" then 458if h.status == "error" then
459 print( h:get_debug_threadname(), "error: " , a, b, c, d) 459 print( h:get_debug_threadname(), "error: " , a, b, c, d)
460else 460else
461 print( h:get_debug_threadname(), a,b,c,d) 461 print( h:get_debug_threadname(), a,b,c,d)
462 assert(a==14) 462 assert(a==14)
463 assert(b==13) 463 assert(b==13)
464 assert(c==12) 464 assert(c==12)
465 assert(d==nil) 465 assert(d==nil)
466end 466end
467 467
468local nameof_type, nameof_name = lanes.nameof( print) 468local nameof_type, nameof_name = lanes.nameof( print)