aboutsummaryrefslogtreecommitdiff
path: root/src/lanes.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/lanes.lua')
-rw-r--r--src/lanes.lua451
1 files changed, 229 insertions, 222 deletions
diff --git a/src/lanes.lua b/src/lanes.lua
index 6af286a..fd3d22b 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -73,6 +73,7 @@ lanes.configure = function( settings_)
73 keepers_gc_threshold = -1, 73 keepers_gc_threshold = -1,
74 on_state_create = nil, 74 on_state_create = nil,
75 shutdown_timeout = 0.25, 75 shutdown_timeout = 0.25,
76 shutdown_mode = "hard",
76 with_timers = true, 77 with_timers = true,
77 track_lanes = false, 78 track_lanes = false,
78 demote_full_userdata = nil, 79 demote_full_userdata = nil,
@@ -113,6 +114,11 @@ lanes.configure = function( settings_)
113 -- shutdown_timeout should be a number >= 0 114 -- shutdown_timeout should be a number >= 0
114 return type( val_) == "number" and val_ >= 0 115 return type( val_) == "number" and val_ >= 0
115 end, 116 end,
117 shutdown_mode = function( val_)
118 local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true }
119 -- shutdown_mode should be a known hook mask
120 return valid_hooks[val_]
121 end,
116 track_lanes = boolean_param_checker, 122 track_lanes = boolean_param_checker,
117 demote_full_userdata = boolean_param_checker, 123 demote_full_userdata = boolean_param_checker,
118 verbose_errors = boolean_param_checker 124 verbose_errors = boolean_param_checker
@@ -367,262 +373,263 @@ lanes.configure = function( settings_)
367 373
368 374
369 if settings.with_timers ~= false then 375 if settings.with_timers ~= false then
376 --
377 -- On first 'require "lanes"', a timer lane is spawned that will maintain
378 -- timer tables and sleep in between the timer events. All interaction with
379 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
380 -- all that 'require "lanes"'.
381 --
382 -- Linda protocol to timer lane:
383 --
384 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
385 --
386 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
387 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
388 local first_time_key= "first time"
370 389
371 -- 390 local first_time = timer_gateway:get( first_time_key) == nil
372 -- On first 'require "lanes"', a timer lane is spawned that will maintain 391 timer_gateway:set( first_time_key, true)
373 -- timer tables and sleep in between the timer events. All interaction with
374 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
375 -- all that 'require "lanes"'.
376 --
377 -- Linda protocol to timer lane:
378 --
379 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
380 --
381 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
382 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
383 local first_time_key= "first time"
384
385 local first_time = timer_gateway:get( first_time_key) == nil
386 timer_gateway:set( first_time_key, true)
387
388 --
389 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
390 -- has 'table' always declared)
391 --
392 if first_time then
393 392
394 local now_secs = core.now_secs 393 local now_secs = core.now_secs
395 assert( type( now_secs) == "function") 394 local wakeup_conv = core.wakeup_conv
396 ----- 395
397 -- Snore loop (run as a lane on the background)
398 --
399 -- High priority, to get trustworthy timings.
400 -- 396 --
401 -- We let the timer lane be a "free running" thread; no handle to it 397 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
402 -- remains. 398 -- has 'table' always declared)
403 -- 399 --
404 local timer_body = function() 400 if first_time then
405 set_debug_threadname( "LanesTimer") 401
406 -- 402 assert( type( now_secs) == "function")
407 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, 403 -----
408 -- [key]= { wakeup_secs [,period_secs] } [, ...] }, 404 -- Snore loop (run as a lane on the background)
409 -- }
410 --
411 -- Collection of all running timers, indexed with linda's & key.
412 -- 405 --
413 -- Note that we need to use the deep lightuserdata identifiers, instead 406 -- High priority, to get trustworthy timings.
414 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
415 -- entries for the same timer.
416 -- 407 --
417 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but 408 -- We let the timer lane be a "free running" thread; no handle to it
418 -- also important to keep the Linda alive, even if all outside world threw 409 -- remains.
419 -- away pointers to it (which would ruin uniqueness of the deep pointer).
420 -- Now we're safe.
421 -- 410 --
422 local collection = {} 411 local timer_body = function()
423 local table_insert = assert( table.insert) 412 set_debug_threadname( "LanesTimer")
424 413 --
425 local get_timers = function() 414 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
426 local r = {} 415 -- [key]= { wakeup_secs [,period_secs] } [, ...] },
427 for deep, t in pairs( collection) do 416 -- }
428 -- WR( tostring( deep)) 417 --
429 local l = t[deep] 418 -- Collection of all running timers, indexed with linda's & key.
430 for key, timer_data in pairs( t) do 419 --
431 if key ~= deep then 420 -- Note that we need to use the deep lightuserdata identifiers, instead
432 table_insert( r, {l, key, timer_data}) 421 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
422 -- entries for the same timer.
423 --
424 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
425 -- also important to keep the Linda alive, even if all outside world threw
426 -- away pointers to it (which would ruin uniqueness of the deep pointer).
427 -- Now we're safe.
428 --
429 local collection = {}
430 local table_insert = assert( table.insert)
431
432 local get_timers = function()
433 local r = {}
434 for deep, t in pairs( collection) do
435 -- WR( tostring( deep))
436 local l = t[deep]
437 for key, timer_data in pairs( t) do
438 if key ~= deep then
439 table_insert( r, {l, key, timer_data})
440 end
433 end 441 end
434 end 442 end
435 end 443 return r
436 return r 444 end -- get_timers()
437 end -- get_timers()
438
439 --
440 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
441 --
442 local set_timer = function( linda, key, wakeup_at, period)
443 assert( wakeup_at == nil or wakeup_at > 0.0)
444 assert( period == nil or period > 0.0)
445 445
446 local linda_deep = linda:deep()
447 assert( linda_deep)
448
449 -- Find or make a lookup for this timer
450 -- 446 --
451 local t1 = collection[linda_deep] 447 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
452 if not t1 then 448 --
453 t1 = { [linda_deep] = linda} -- proxy to use the Linda 449 local set_timer = function( linda, key, wakeup_at, period)
454 collection[linda_deep] = t1 450 assert( wakeup_at == nil or wakeup_at > 0.0)
455 end 451 assert( period == nil or period > 0.0)
456 452
457 if wakeup_at == nil then 453 local linda_deep = linda:deep()
458 -- Clear the timer 454 assert( linda_deep)
459 --
460 t1[key]= nil
461 455
462 -- Remove empty tables from collection; speeds timer checks and 456 -- Find or make a lookup for this timer
463 -- lets our 'safety reference' proxy be gc:ed as well.
464 -- 457 --
465 local empty = true 458 local t1 = collection[linda_deep]
466 for k, _ in pairs( t1) do 459 if not t1 then
467 if k ~= linda_deep then 460 t1 = { [linda_deep] = linda} -- proxy to use the Linda
468 empty = false 461 collection[linda_deep] = t1
469 break
470 end
471 end
472 if empty then
473 collection[linda_deep] = nil
474 end 462 end
475 463
476 -- Note: any unread timer value is left at 'linda[key]' intensionally; 464 if wakeup_at == nil then
477 -- clearing a timer just stops it. 465 -- Clear the timer
478 else 466 --
479 -- New timer or changing the timings 467 t1[key]= nil
480 --
481 local t2 = t1[key]
482 if not t2 then
483 t2= {}
484 t1[key]= t2
485 end
486 468
487 t2[1] = wakeup_at 469 -- Remove empty tables from collection; speeds timer checks and
488 t2[2] = period -- can be 'nil' 470 -- lets our 'safety reference' proxy be gc:ed as well.
489 end 471 --
490 end -- set_timer() 472 local empty = true
473 for k, _ in pairs( t1) do
474 if k ~= linda_deep then
475 empty = false
476 break
477 end
478 end
479 if empty then
480 collection[linda_deep] = nil
481 end
491 482
492 ----- 483 -- Note: any unread timer value is left at 'linda[key]' intensionally;
493 -- [next_wakeup_at]= check_timers() 484 -- clearing a timer just stops it.
494 -- Check timers, and wake up the ones expired (if any) 485 else
495 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). 486 -- New timer or changing the timings
496 local check_timers = function()
497 local now = now_secs()
498 local next_wakeup
499
500 for linda_deep,t1 in pairs(collection) do
501 for key,t2 in pairs(t1) do
502 -- 487 --
503 if key==linda_deep then 488 local t2 = t1[key]
504 -- no 'continue' in Lua :/ 489 if not t2 then
505 else 490 t2= {}
506 -- 't2': { wakeup_at_secs [,period_secs] } 491 t1[key]= t2
492 end
493
494 t2[1] = wakeup_at
495 t2[2] = period -- can be 'nil'
496 end
497 end -- set_timer()
498
499 -----
500 -- [next_wakeup_at]= check_timers()
501 -- Check timers, and wake up the ones expired (if any)
502 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
503 local check_timers = function()
504 local now = now_secs()
505 local next_wakeup
506
507 for linda_deep,t1 in pairs(collection) do
508 for key,t2 in pairs(t1) do
507 -- 509 --
508 local wakeup_at= t2[1] 510 if key==linda_deep then
509 local period= t2[2] -- may be 'nil' 511 -- no 'continue' in Lua :/
510 512 else
511 if wakeup_at <= now then 513 -- 't2': { wakeup_at_secs [,period_secs] }
512 local linda= t1[linda_deep] 514 --
513 assert(linda) 515 local wakeup_at= t2[1]
514 516 local period= t2[2] -- may be 'nil'
515 linda:set( key, now ) 517
516 518 if wakeup_at <= now then
517 -- 'pairs()' allows the values to be modified (and even 519 local linda= t1[linda_deep]
518 -- removed) as far as keys are not touched 520 assert(linda)
519 521
520 if not period then 522 linda:set( key, now )
521 -- one-time timer; gone 523
522 -- 524 -- 'pairs()' allows the values to be modified (and even
523 t1[key]= nil 525 -- removed) as far as keys are not touched
524 wakeup_at= nil -- no 'continue' in Lua :/ 526
525 else 527 if not period then
526 -- repeating timer; find next wakeup (may jump multiple repeats) 528 -- one-time timer; gone
527 -- 529 --
528 repeat 530 t1[key]= nil
529 wakeup_at= wakeup_at+period 531 wakeup_at= nil -- no 'continue' in Lua :/
530 until wakeup_at > now 532 else
531 533 -- repeating timer; find next wakeup (may jump multiple repeats)
532 t2[1]= wakeup_at 534 --
535 repeat
536 wakeup_at= wakeup_at+period
537 until wakeup_at > now
538
539 t2[1]= wakeup_at
540 end
533 end 541 end
534 end
535 542
536 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then 543 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
537 next_wakeup= wakeup_at 544 next_wakeup= wakeup_at
545 end
538 end 546 end
547 end -- t2 loop
548 end -- t1 loop
549
550 return next_wakeup -- may be 'nil'
551 end -- check_timers()
552
553 local timer_gateway_batched = timer_gateway.batched
554 set_finalizer( function( err, stk)
555 if err and type( err) ~= "userdata" then
556 WR( "LanesTimer error: "..tostring(err))
557 --elseif type( err) == "userdata" then
558 -- WR( "LanesTimer after cancel" )
559 --else
560 -- WR("LanesTimer finalized")
561 end
562 end)
563 while true do
564 local next_wakeup = check_timers()
565
566 -- Sleep until next timer to wake up, or a set/clear command
567 --
568 local secs
569 if next_wakeup then
570 secs = next_wakeup - now_secs()
571 if secs < 0 then secs = 0 end
572 end
573 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
574
575 if key == TGW_KEY then
576 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
577 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
578 assert( key)
579 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
580 elseif key == TGW_QUERY then
581 if what == "get_timers" then
582 timer_gateway:send( TGW_REPLY, get_timers())
583 else
584 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
539 end 585 end
540 end -- t2 loop 586 --elseif secs == nil then -- got no value while block-waiting?
541 end -- t1 loop 587 -- WR( "timer lane: no linda, aborted?")
542 588 end
543 return next_wakeup -- may be 'nil'
544 end -- check_timers()
545
546 local timer_gateway_batched = timer_gateway.batched
547 set_finalizer( function( err, stk)
548 if err and type( err) ~= "userdata" then
549 WR( "LanesTimer error: "..tostring(err))
550 --elseif type( err) == "userdata" then
551 -- WR( "LanesTimer after cancel" )
552 --else
553 -- WR("LanesTimer finalized")
554 end 589 end
555 end) 590 end -- timer_body()
556 while true do 591 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
557 local next_wakeup = check_timers() 592 end -- first_time
558 593
559 -- Sleep until next timer to wake up, or a set/clear command 594 -----
595 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
596 --
597 -- PUBLIC LANES API
598 timer = function( linda, key, a, period )
599 if getmetatable( linda) ~= "Linda" then
600 error "expecting a Linda"
601 end
602 if a == 0.0 then
603 -- Caller expects to get current time stamp in Linda, on return
604 -- (like the timer had expired instantly); it would be good to set this
605 -- as late as possible (to give most current time) but also we want it
606 -- to precede any possible timers that might start striking.
560 -- 607 --
561 local secs 608 linda:set( key, now_secs())
562 if next_wakeup then 609
563 secs = next_wakeup - now_secs() 610 if not period or period==0.0 then
564 if secs < 0 then secs = 0 end 611 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
565 end 612 return -- nothing more to do
566 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
567
568 if key == TGW_KEY then
569 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
570 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
571 assert( key)
572 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
573 elseif key == TGW_QUERY then
574 if what == "get_timers" then
575 timer_gateway:send( TGW_REPLY, get_timers())
576 else
577 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
578 end
579 --elseif secs == nil then -- got no value while block-waiting?
580 -- WR( "timer lane: no linda, aborted?")
581 end 613 end
614 a= period
582 end 615 end
583 end -- timer_body()
584 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
585 end -- first_time
586 616
587 ----- 617 local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
588 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) 618 or (a and now_secs()+a or nil)
589 -- 619 -- queue to timer
590 -- PUBLIC LANES API
591 timer = function( linda, key, a, period )
592 if getmetatable( linda) ~= "Linda" then
593 error "expecting a Linda"
594 end
595 if a == 0.0 then
596 -- Caller expects to get current time stamp in Linda, on return
597 -- (like the timer had expired instantly); it would be good to set this
598 -- as late as possible (to give most current time) but also we want it
599 -- to precede any possible timers that might start striking.
600 -- 620 --
601 linda:set( key, core.now_secs()) 621 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
622 end -- timer()
602 623
603 if not period or period==0.0 then 624 -----
604 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer 625 -- {[{linda, slot, when, period}[,...]]} = timers()
605 return -- nothing more to do
606 end
607 a= period
608 end
609
610 local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time
611 or (a and core.now_secs()+a or nil)
612 -- queue to timer
613 -- 626 --
614 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) 627 -- PUBLIC LANES API
615 end 628 timers = function()
616 629 timer_gateway:send( TGW_QUERY, "get_timers")
617 ----- 630 local _, r = timer_gateway:receive( TGW_REPLY)
618 -- {[{linda, slot, when, period}[,...]]} = timers() 631 return r
619 -- 632 end -- timers()
620 -- PUBLIC LANES API
621 timers = function()
622 timer_gateway:send( TGW_QUERY, "get_timers")
623 local _, r = timer_gateway:receive( TGW_REPLY)
624 return r
625 end
626 633
627 end -- settings.with_timers 634 end -- settings.with_timers
628 635