diff options
Diffstat (limited to 'src/lanes.lua')
-rw-r--r-- | src/lanes.lua | 451 |
1 files changed, 229 insertions, 222 deletions
diff --git a/src/lanes.lua b/src/lanes.lua index 6af286a..fd3d22b 100644 --- a/src/lanes.lua +++ b/src/lanes.lua | |||
@@ -73,6 +73,7 @@ lanes.configure = function( settings_) | |||
73 | keepers_gc_threshold = -1, | 73 | keepers_gc_threshold = -1, |
74 | on_state_create = nil, | 74 | on_state_create = nil, |
75 | shutdown_timeout = 0.25, | 75 | shutdown_timeout = 0.25, |
76 | shutdown_mode = "hard", | ||
76 | with_timers = true, | 77 | with_timers = true, |
77 | track_lanes = false, | 78 | track_lanes = false, |
78 | demote_full_userdata = nil, | 79 | demote_full_userdata = nil, |
@@ -113,6 +114,11 @@ lanes.configure = function( settings_) | |||
113 | -- shutdown_timeout should be a number >= 0 | 114 | -- shutdown_timeout should be a number >= 0 |
114 | return type( val_) == "number" and val_ >= 0 | 115 | return type( val_) == "number" and val_ >= 0 |
115 | end, | 116 | end, |
117 | shutdown_mode = function( val_) | ||
118 | local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true } | ||
119 | -- shutdown_mode should be a known hook mask | ||
120 | return valid_hooks[val_] | ||
121 | end, | ||
116 | track_lanes = boolean_param_checker, | 122 | track_lanes = boolean_param_checker, |
117 | demote_full_userdata = boolean_param_checker, | 123 | demote_full_userdata = boolean_param_checker, |
118 | verbose_errors = boolean_param_checker | 124 | verbose_errors = boolean_param_checker |
@@ -367,262 +373,263 @@ lanes.configure = function( settings_) | |||
367 | 373 | ||
368 | 374 | ||
369 | if settings.with_timers ~= false then | 375 | if settings.with_timers ~= false then |
376 | -- | ||
377 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | ||
378 | -- timer tables and sleep in between the timer events. All interaction with | ||
379 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
380 | -- all that 'require "lanes"'. | ||
381 | -- | ||
382 | -- Linda protocol to timer lane: | ||
383 | -- | ||
384 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
385 | -- | ||
386 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
387 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
388 | local first_time_key= "first time" | ||
370 | 389 | ||
371 | -- | 390 | local first_time = timer_gateway:get( first_time_key) == nil |
372 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | 391 | timer_gateway:set( first_time_key, true) |
373 | -- timer tables and sleep in between the timer events. All interaction with | ||
374 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
375 | -- all that 'require "lanes"'. | ||
376 | -- | ||
377 | -- Linda protocol to timer lane: | ||
378 | -- | ||
379 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
380 | -- | ||
381 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
382 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
383 | local first_time_key= "first time" | ||
384 | |||
385 | local first_time = timer_gateway:get( first_time_key) == nil | ||
386 | timer_gateway:set( first_time_key, true) | ||
387 | |||
388 | -- | ||
389 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally | ||
390 | -- has 'table' always declared) | ||
391 | -- | ||
392 | if first_time then | ||
393 | 392 | ||
394 | local now_secs = core.now_secs | 393 | local now_secs = core.now_secs |
395 | assert( type( now_secs) == "function") | 394 | local wakeup_conv = core.wakeup_conv |
396 | ----- | 395 | |
397 | -- Snore loop (run as a lane on the background) | ||
398 | -- | ||
399 | -- High priority, to get trustworthy timings. | ||
400 | -- | 396 | -- |
401 | -- We let the timer lane be a "free running" thread; no handle to it | 397 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally |
402 | -- remains. | 398 | -- has 'table' always declared) |
403 | -- | 399 | -- |
404 | local timer_body = function() | 400 | if first_time then |
405 | set_debug_threadname( "LanesTimer") | 401 | |
406 | -- | 402 | assert( type( now_secs) == "function") |
407 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, | 403 | ----- |
408 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, | 404 | -- Snore loop (run as a lane on the background) |
409 | -- } | ||
410 | -- | ||
411 | -- Collection of all running timers, indexed with linda's & key. | ||
412 | -- | 405 | -- |
413 | -- Note that we need to use the deep lightuserdata identifiers, instead | 406 | -- High priority, to get trustworthy timings. |
414 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple | ||
415 | -- entries for the same timer. | ||
416 | -- | 407 | -- |
417 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | 408 | -- We let the timer lane be a "free running" thread; no handle to it |
418 | -- also important to keep the Linda alive, even if all outside world threw | 409 | -- remains. |
419 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
420 | -- Now we're safe. | ||
421 | -- | 410 | -- |
422 | local collection = {} | 411 | local timer_body = function() |
423 | local table_insert = assert( table.insert) | 412 | set_debug_threadname( "LanesTimer") |
424 | 413 | -- | |
425 | local get_timers = function() | 414 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, |
426 | local r = {} | 415 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, |
427 | for deep, t in pairs( collection) do | 416 | -- } |
428 | -- WR( tostring( deep)) | 417 | -- |
429 | local l = t[deep] | 418 | -- Collection of all running timers, indexed with linda's & key. |
430 | for key, timer_data in pairs( t) do | 419 | -- |
431 | if key ~= deep then | 420 | -- Note that we need to use the deep lightuserdata identifiers, instead |
432 | table_insert( r, {l, key, timer_data}) | 421 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple |
422 | -- entries for the same timer. | ||
423 | -- | ||
424 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | ||
425 | -- also important to keep the Linda alive, even if all outside world threw | ||
426 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
427 | -- Now we're safe. | ||
428 | -- | ||
429 | local collection = {} | ||
430 | local table_insert = assert( table.insert) | ||
431 | |||
432 | local get_timers = function() | ||
433 | local r = {} | ||
434 | for deep, t in pairs( collection) do | ||
435 | -- WR( tostring( deep)) | ||
436 | local l = t[deep] | ||
437 | for key, timer_data in pairs( t) do | ||
438 | if key ~= deep then | ||
439 | table_insert( r, {l, key, timer_data}) | ||
440 | end | ||
433 | end | 441 | end |
434 | end | 442 | end |
435 | end | 443 | return r |
436 | return r | 444 | end -- get_timers() |
437 | end -- get_timers() | ||
438 | |||
439 | -- | ||
440 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) | ||
441 | -- | ||
442 | local set_timer = function( linda, key, wakeup_at, period) | ||
443 | assert( wakeup_at == nil or wakeup_at > 0.0) | ||
444 | assert( period == nil or period > 0.0) | ||
445 | 445 | ||
446 | local linda_deep = linda:deep() | ||
447 | assert( linda_deep) | ||
448 | |||
449 | -- Find or make a lookup for this timer | ||
450 | -- | 446 | -- |
451 | local t1 = collection[linda_deep] | 447 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) |
452 | if not t1 then | 448 | -- |
453 | t1 = { [linda_deep] = linda} -- proxy to use the Linda | 449 | local set_timer = function( linda, key, wakeup_at, period) |
454 | collection[linda_deep] = t1 | 450 | assert( wakeup_at == nil or wakeup_at > 0.0) |
455 | end | 451 | assert( period == nil or period > 0.0) |
456 | 452 | ||
457 | if wakeup_at == nil then | 453 | local linda_deep = linda:deep() |
458 | -- Clear the timer | 454 | assert( linda_deep) |
459 | -- | ||
460 | t1[key]= nil | ||
461 | 455 | ||
462 | -- Remove empty tables from collection; speeds timer checks and | 456 | -- Find or make a lookup for this timer |
463 | -- lets our 'safety reference' proxy be gc:ed as well. | ||
464 | -- | 457 | -- |
465 | local empty = true | 458 | local t1 = collection[linda_deep] |
466 | for k, _ in pairs( t1) do | 459 | if not t1 then |
467 | if k ~= linda_deep then | 460 | t1 = { [linda_deep] = linda} -- proxy to use the Linda |
468 | empty = false | 461 | collection[linda_deep] = t1 |
469 | break | ||
470 | end | ||
471 | end | ||
472 | if empty then | ||
473 | collection[linda_deep] = nil | ||
474 | end | 462 | end |
475 | 463 | ||
476 | -- Note: any unread timer value is left at 'linda[key]' intensionally; | 464 | if wakeup_at == nil then |
477 | -- clearing a timer just stops it. | 465 | -- Clear the timer |
478 | else | 466 | -- |
479 | -- New timer or changing the timings | 467 | t1[key]= nil |
480 | -- | ||
481 | local t2 = t1[key] | ||
482 | if not t2 then | ||
483 | t2= {} | ||
484 | t1[key]= t2 | ||
485 | end | ||
486 | 468 | ||
487 | t2[1] = wakeup_at | 469 | -- Remove empty tables from collection; speeds timer checks and |
488 | t2[2] = period -- can be 'nil' | 470 | -- lets our 'safety reference' proxy be gc:ed as well. |
489 | end | 471 | -- |
490 | end -- set_timer() | 472 | local empty = true |
473 | for k, _ in pairs( t1) do | ||
474 | if k ~= linda_deep then | ||
475 | empty = false | ||
476 | break | ||
477 | end | ||
478 | end | ||
479 | if empty then | ||
480 | collection[linda_deep] = nil | ||
481 | end | ||
491 | 482 | ||
492 | ----- | 483 | -- Note: any unread timer value is left at 'linda[key]' intensionally; |
493 | -- [next_wakeup_at]= check_timers() | 484 | -- clearing a timer just stops it. |
494 | -- Check timers, and wake up the ones expired (if any) | 485 | else |
495 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | 486 | -- New timer or changing the timings |
496 | local check_timers = function() | ||
497 | local now = now_secs() | ||
498 | local next_wakeup | ||
499 | |||
500 | for linda_deep,t1 in pairs(collection) do | ||
501 | for key,t2 in pairs(t1) do | ||
502 | -- | 487 | -- |
503 | if key==linda_deep then | 488 | local t2 = t1[key] |
504 | -- no 'continue' in Lua :/ | 489 | if not t2 then |
505 | else | 490 | t2= {} |
506 | -- 't2': { wakeup_at_secs [,period_secs] } | 491 | t1[key]= t2 |
492 | end | ||
493 | |||
494 | t2[1] = wakeup_at | ||
495 | t2[2] = period -- can be 'nil' | ||
496 | end | ||
497 | end -- set_timer() | ||
498 | |||
499 | ----- | ||
500 | -- [next_wakeup_at]= check_timers() | ||
501 | -- Check timers, and wake up the ones expired (if any) | ||
502 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | ||
503 | local check_timers = function() | ||
504 | local now = now_secs() | ||
505 | local next_wakeup | ||
506 | |||
507 | for linda_deep,t1 in pairs(collection) do | ||
508 | for key,t2 in pairs(t1) do | ||
507 | -- | 509 | -- |
508 | local wakeup_at= t2[1] | 510 | if key==linda_deep then |
509 | local period= t2[2] -- may be 'nil' | 511 | -- no 'continue' in Lua :/ |
510 | 512 | else | |
511 | if wakeup_at <= now then | 513 | -- 't2': { wakeup_at_secs [,period_secs] } |
512 | local linda= t1[linda_deep] | 514 | -- |
513 | assert(linda) | 515 | local wakeup_at= t2[1] |
514 | 516 | local period= t2[2] -- may be 'nil' | |
515 | linda:set( key, now ) | 517 | |
516 | 518 | if wakeup_at <= now then | |
517 | -- 'pairs()' allows the values to be modified (and even | 519 | local linda= t1[linda_deep] |
518 | -- removed) as far as keys are not touched | 520 | assert(linda) |
519 | 521 | ||
520 | if not period then | 522 | linda:set( key, now ) |
521 | -- one-time timer; gone | 523 | |
522 | -- | 524 | -- 'pairs()' allows the values to be modified (and even |
523 | t1[key]= nil | 525 | -- removed) as far as keys are not touched |
524 | wakeup_at= nil -- no 'continue' in Lua :/ | 526 | |
525 | else | 527 | if not period then |
526 | -- repeating timer; find next wakeup (may jump multiple repeats) | 528 | -- one-time timer; gone |
527 | -- | 529 | -- |
528 | repeat | 530 | t1[key]= nil |
529 | wakeup_at= wakeup_at+period | 531 | wakeup_at= nil -- no 'continue' in Lua :/ |
530 | until wakeup_at > now | 532 | else |
531 | 533 | -- repeating timer; find next wakeup (may jump multiple repeats) | |
532 | t2[1]= wakeup_at | 534 | -- |
535 | repeat | ||
536 | wakeup_at= wakeup_at+period | ||
537 | until wakeup_at > now | ||
538 | |||
539 | t2[1]= wakeup_at | ||
540 | end | ||
533 | end | 541 | end |
534 | end | ||
535 | 542 | ||
536 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then | 543 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then |
537 | next_wakeup= wakeup_at | 544 | next_wakeup= wakeup_at |
545 | end | ||
538 | end | 546 | end |
547 | end -- t2 loop | ||
548 | end -- t1 loop | ||
549 | |||
550 | return next_wakeup -- may be 'nil' | ||
551 | end -- check_timers() | ||
552 | |||
553 | local timer_gateway_batched = timer_gateway.batched | ||
554 | set_finalizer( function( err, stk) | ||
555 | if err and type( err) ~= "userdata" then | ||
556 | WR( "LanesTimer error: "..tostring(err)) | ||
557 | --elseif type( err) == "userdata" then | ||
558 | -- WR( "LanesTimer after cancel" ) | ||
559 | --else | ||
560 | -- WR("LanesTimer finalized") | ||
561 | end | ||
562 | end) | ||
563 | while true do | ||
564 | local next_wakeup = check_timers() | ||
565 | |||
566 | -- Sleep until next timer to wake up, or a set/clear command | ||
567 | -- | ||
568 | local secs | ||
569 | if next_wakeup then | ||
570 | secs = next_wakeup - now_secs() | ||
571 | if secs < 0 then secs = 0 end | ||
572 | end | ||
573 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
574 | |||
575 | if key == TGW_KEY then | ||
576 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
577 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
578 | assert( key) | ||
579 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
580 | elseif key == TGW_QUERY then | ||
581 | if what == "get_timers" then | ||
582 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
583 | else | ||
584 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
539 | end | 585 | end |
540 | end -- t2 loop | 586 | --elseif secs == nil then -- got no value while block-waiting? |
541 | end -- t1 loop | 587 | -- WR( "timer lane: no linda, aborted?") |
542 | 588 | end | |
543 | return next_wakeup -- may be 'nil' | ||
544 | end -- check_timers() | ||
545 | |||
546 | local timer_gateway_batched = timer_gateway.batched | ||
547 | set_finalizer( function( err, stk) | ||
548 | if err and type( err) ~= "userdata" then | ||
549 | WR( "LanesTimer error: "..tostring(err)) | ||
550 | --elseif type( err) == "userdata" then | ||
551 | -- WR( "LanesTimer after cancel" ) | ||
552 | --else | ||
553 | -- WR("LanesTimer finalized") | ||
554 | end | 589 | end |
555 | end) | 590 | end -- timer_body() |
556 | while true do | 591 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... |
557 | local next_wakeup = check_timers() | 592 | end -- first_time |
558 | 593 | ||
559 | -- Sleep until next timer to wake up, or a set/clear command | 594 | ----- |
595 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | ||
596 | -- | ||
597 | -- PUBLIC LANES API | ||
598 | timer = function( linda, key, a, period ) | ||
599 | if getmetatable( linda) ~= "Linda" then | ||
600 | error "expecting a Linda" | ||
601 | end | ||
602 | if a == 0.0 then | ||
603 | -- Caller expects to get current time stamp in Linda, on return | ||
604 | -- (like the timer had expired instantly); it would be good to set this | ||
605 | -- as late as possible (to give most current time) but also we want it | ||
606 | -- to precede any possible timers that might start striking. | ||
560 | -- | 607 | -- |
561 | local secs | 608 | linda:set( key, now_secs()) |
562 | if next_wakeup then | 609 | |
563 | secs = next_wakeup - now_secs() | 610 | if not period or period==0.0 then |
564 | if secs < 0 then secs = 0 end | 611 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer |
565 | end | 612 | return -- nothing more to do |
566 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
567 | |||
568 | if key == TGW_KEY then | ||
569 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
570 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
571 | assert( key) | ||
572 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
573 | elseif key == TGW_QUERY then | ||
574 | if what == "get_timers" then | ||
575 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
576 | else | ||
577 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
578 | end | ||
579 | --elseif secs == nil then -- got no value while block-waiting? | ||
580 | -- WR( "timer lane: no linda, aborted?") | ||
581 | end | 613 | end |
614 | a= period | ||
582 | end | 615 | end |
583 | end -- timer_body() | ||
584 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... | ||
585 | end -- first_time | ||
586 | 616 | ||
587 | ----- | 617 | local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time |
588 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | 618 | or (a and now_secs()+a or nil) |
589 | -- | 619 | -- queue to timer |
590 | -- PUBLIC LANES API | ||
591 | timer = function( linda, key, a, period ) | ||
592 | if getmetatable( linda) ~= "Linda" then | ||
593 | error "expecting a Linda" | ||
594 | end | ||
595 | if a == 0.0 then | ||
596 | -- Caller expects to get current time stamp in Linda, on return | ||
597 | -- (like the timer had expired instantly); it would be good to set this | ||
598 | -- as late as possible (to give most current time) but also we want it | ||
599 | -- to precede any possible timers that might start striking. | ||
600 | -- | 620 | -- |
601 | linda:set( key, core.now_secs()) | 621 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) |
622 | end -- timer() | ||
602 | 623 | ||
603 | if not period or period==0.0 then | 624 | ----- |
604 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer | 625 | -- {[{linda, slot, when, period}[,...]]} = timers() |
605 | return -- nothing more to do | ||
606 | end | ||
607 | a= period | ||
608 | end | ||
609 | |||
610 | local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time | ||
611 | or (a and core.now_secs()+a or nil) | ||
612 | -- queue to timer | ||
613 | -- | 626 | -- |
614 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) | 627 | -- PUBLIC LANES API |
615 | end | 628 | timers = function() |
616 | 629 | timer_gateway:send( TGW_QUERY, "get_timers") | |
617 | ----- | 630 | local _, r = timer_gateway:receive( TGW_REPLY) |
618 | -- {[{linda, slot, when, period}[,...]]} = timers() | 631 | return r |
619 | -- | 632 | end -- timers() |
620 | -- PUBLIC LANES API | ||
621 | timers = function() | ||
622 | timer_gateway:send( TGW_QUERY, "get_timers") | ||
623 | local _, r = timer_gateway:receive( TGW_REPLY) | ||
624 | return r | ||
625 | end | ||
626 | 633 | ||
627 | end -- settings.with_timers | 634 | end -- settings.with_timers |
628 | 635 | ||