diff options
Diffstat (limited to 'src/lanes.lua')
-rw-r--r-- | src/lanes.lua | 456 |
1 files changed, 234 insertions, 222 deletions
diff --git a/src/lanes.lua b/src/lanes.lua index b4c0070..fd3d22b 100644 --- a/src/lanes.lua +++ b/src/lanes.lua | |||
@@ -70,8 +70,10 @@ lanes.configure = function( settings_) | |||
70 | local default_params = | 70 | local default_params = |
71 | { | 71 | { |
72 | nb_keepers = 1, | 72 | nb_keepers = 1, |
73 | keepers_gc_threshold = -1, | ||
73 | on_state_create = nil, | 74 | on_state_create = nil, |
74 | shutdown_timeout = 0.25, | 75 | shutdown_timeout = 0.25, |
76 | shutdown_mode = "hard", | ||
75 | with_timers = true, | 77 | with_timers = true, |
76 | track_lanes = false, | 78 | track_lanes = false, |
77 | demote_full_userdata = nil, | 79 | demote_full_userdata = nil, |
@@ -91,6 +93,10 @@ lanes.configure = function( settings_) | |||
91 | -- nb_keepers should be a number > 0 | 93 | -- nb_keepers should be a number > 0 |
92 | return type( val_) == "number" and val_ > 0 | 94 | return type( val_) == "number" and val_ > 0 |
93 | end, | 95 | end, |
96 | keepers_gc_threshold = function( val_) | ||
97 | -- keepers_gc_threshold should be a number | ||
98 | return type( val_) == "number" | ||
99 | end, | ||
94 | with_timers = boolean_param_checker, | 100 | with_timers = boolean_param_checker, |
95 | allocator = function( val_) | 101 | allocator = function( val_) |
96 | -- can be nil, "protected", or a function | 102 | -- can be nil, "protected", or a function |
@@ -108,6 +114,11 @@ lanes.configure = function( settings_) | |||
108 | -- shutdown_timeout should be a number >= 0 | 114 | -- shutdown_timeout should be a number >= 0 |
109 | return type( val_) == "number" and val_ >= 0 | 115 | return type( val_) == "number" and val_ >= 0 |
110 | end, | 116 | end, |
117 | shutdown_mode = function( val_) | ||
118 | local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true } | ||
119 | -- shutdown_mode should be a known hook mask | ||
120 | return valid_hooks[val_] | ||
121 | end, | ||
111 | track_lanes = boolean_param_checker, | 122 | track_lanes = boolean_param_checker, |
112 | demote_full_userdata = boolean_param_checker, | 123 | demote_full_userdata = boolean_param_checker, |
113 | verbose_errors = boolean_param_checker | 124 | verbose_errors = boolean_param_checker |
@@ -362,262 +373,263 @@ lanes.configure = function( settings_) | |||
362 | 373 | ||
363 | 374 | ||
364 | if settings.with_timers ~= false then | 375 | if settings.with_timers ~= false then |
376 | -- | ||
377 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | ||
378 | -- timer tables and sleep in between the timer events. All interaction with | ||
379 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
380 | -- all that 'require "lanes"'. | ||
381 | -- | ||
382 | -- Linda protocol to timer lane: | ||
383 | -- | ||
384 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
385 | -- | ||
386 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
387 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
388 | local first_time_key= "first time" | ||
365 | 389 | ||
366 | -- | 390 | local first_time = timer_gateway:get( first_time_key) == nil |
367 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | 391 | timer_gateway:set( first_time_key, true) |
368 | -- timer tables and sleep in between the timer events. All interaction with | ||
369 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
370 | -- all that 'require "lanes"'. | ||
371 | -- | ||
372 | -- Linda protocol to timer lane: | ||
373 | -- | ||
374 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
375 | -- | ||
376 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
377 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
378 | local first_time_key= "first time" | ||
379 | |||
380 | local first_time = timer_gateway:get( first_time_key) == nil | ||
381 | timer_gateway:set( first_time_key, true) | ||
382 | |||
383 | -- | ||
384 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally | ||
385 | -- has 'table' always declared) | ||
386 | -- | ||
387 | if first_time then | ||
388 | 392 | ||
389 | local now_secs = core.now_secs | 393 | local now_secs = core.now_secs |
390 | assert( type( now_secs) == "function") | 394 | local wakeup_conv = core.wakeup_conv |
391 | ----- | 395 | |
392 | -- Snore loop (run as a lane on the background) | ||
393 | -- | ||
394 | -- High priority, to get trustworthy timings. | ||
395 | -- | 396 | -- |
396 | -- We let the timer lane be a "free running" thread; no handle to it | 397 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally |
397 | -- remains. | 398 | -- has 'table' always declared) |
398 | -- | 399 | -- |
399 | local timer_body = function() | 400 | if first_time then |
400 | set_debug_threadname( "LanesTimer") | 401 | |
401 | -- | 402 | assert( type( now_secs) == "function") |
402 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, | 403 | ----- |
403 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, | 404 | -- Snore loop (run as a lane on the background) |
404 | -- } | ||
405 | -- | ||
406 | -- Collection of all running timers, indexed with linda's & key. | ||
407 | -- | 405 | -- |
408 | -- Note that we need to use the deep lightuserdata identifiers, instead | 406 | -- High priority, to get trustworthy timings. |
409 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple | ||
410 | -- entries for the same timer. | ||
411 | -- | 407 | -- |
412 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | 408 | -- We let the timer lane be a "free running" thread; no handle to it |
413 | -- also important to keep the Linda alive, even if all outside world threw | 409 | -- remains. |
414 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
415 | -- Now we're safe. | ||
416 | -- | 410 | -- |
417 | local collection = {} | 411 | local timer_body = function() |
418 | local table_insert = assert( table.insert) | 412 | set_debug_threadname( "LanesTimer") |
419 | 413 | -- | |
420 | local get_timers = function() | 414 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, |
421 | local r = {} | 415 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, |
422 | for deep, t in pairs( collection) do | 416 | -- } |
423 | -- WR( tostring( deep)) | 417 | -- |
424 | local l = t[deep] | 418 | -- Collection of all running timers, indexed with linda's & key. |
425 | for key, timer_data in pairs( t) do | 419 | -- |
426 | if key ~= deep then | 420 | -- Note that we need to use the deep lightuserdata identifiers, instead |
427 | table_insert( r, {l, key, timer_data}) | 421 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple |
422 | -- entries for the same timer. | ||
423 | -- | ||
424 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | ||
425 | -- also important to keep the Linda alive, even if all outside world threw | ||
426 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
427 | -- Now we're safe. | ||
428 | -- | ||
429 | local collection = {} | ||
430 | local table_insert = assert( table.insert) | ||
431 | |||
432 | local get_timers = function() | ||
433 | local r = {} | ||
434 | for deep, t in pairs( collection) do | ||
435 | -- WR( tostring( deep)) | ||
436 | local l = t[deep] | ||
437 | for key, timer_data in pairs( t) do | ||
438 | if key ~= deep then | ||
439 | table_insert( r, {l, key, timer_data}) | ||
440 | end | ||
428 | end | 441 | end |
429 | end | 442 | end |
430 | end | 443 | return r |
431 | return r | 444 | end -- get_timers() |
432 | end -- get_timers() | ||
433 | |||
434 | -- | ||
435 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) | ||
436 | -- | ||
437 | local set_timer = function( linda, key, wakeup_at, period) | ||
438 | assert( wakeup_at == nil or wakeup_at > 0.0) | ||
439 | assert( period == nil or period > 0.0) | ||
440 | 445 | ||
441 | local linda_deep = linda:deep() | ||
442 | assert( linda_deep) | ||
443 | |||
444 | -- Find or make a lookup for this timer | ||
445 | -- | 446 | -- |
446 | local t1 = collection[linda_deep] | 447 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) |
447 | if not t1 then | 448 | -- |
448 | t1 = { [linda_deep] = linda} -- proxy to use the Linda | 449 | local set_timer = function( linda, key, wakeup_at, period) |
449 | collection[linda_deep] = t1 | 450 | assert( wakeup_at == nil or wakeup_at > 0.0) |
450 | end | 451 | assert( period == nil or period > 0.0) |
451 | 452 | ||
452 | if wakeup_at == nil then | 453 | local linda_deep = linda:deep() |
453 | -- Clear the timer | 454 | assert( linda_deep) |
454 | -- | ||
455 | t1[key]= nil | ||
456 | 455 | ||
457 | -- Remove empty tables from collection; speeds timer checks and | 456 | -- Find or make a lookup for this timer |
458 | -- lets our 'safety reference' proxy be gc:ed as well. | ||
459 | -- | 457 | -- |
460 | local empty = true | 458 | local t1 = collection[linda_deep] |
461 | for k, _ in pairs( t1) do | 459 | if not t1 then |
462 | if k ~= linda_deep then | 460 | t1 = { [linda_deep] = linda} -- proxy to use the Linda |
463 | empty = false | 461 | collection[linda_deep] = t1 |
464 | break | ||
465 | end | ||
466 | end | ||
467 | if empty then | ||
468 | collection[linda_deep] = nil | ||
469 | end | 462 | end |
470 | 463 | ||
471 | -- Note: any unread timer value is left at 'linda[key]' intensionally; | 464 | if wakeup_at == nil then |
472 | -- clearing a timer just stops it. | 465 | -- Clear the timer |
473 | else | 466 | -- |
474 | -- New timer or changing the timings | 467 | t1[key]= nil |
475 | -- | ||
476 | local t2 = t1[key] | ||
477 | if not t2 then | ||
478 | t2= {} | ||
479 | t1[key]= t2 | ||
480 | end | ||
481 | 468 | ||
482 | t2[1] = wakeup_at | 469 | -- Remove empty tables from collection; speeds timer checks and |
483 | t2[2] = period -- can be 'nil' | 470 | -- lets our 'safety reference' proxy be gc:ed as well. |
484 | end | 471 | -- |
485 | end -- set_timer() | 472 | local empty = true |
473 | for k, _ in pairs( t1) do | ||
474 | if k ~= linda_deep then | ||
475 | empty = false | ||
476 | break | ||
477 | end | ||
478 | end | ||
479 | if empty then | ||
480 | collection[linda_deep] = nil | ||
481 | end | ||
486 | 482 | ||
487 | ----- | 483 | -- Note: any unread timer value is left at 'linda[key]' intensionally; |
488 | -- [next_wakeup_at]= check_timers() | 484 | -- clearing a timer just stops it. |
489 | -- Check timers, and wake up the ones expired (if any) | 485 | else |
490 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | 486 | -- New timer or changing the timings |
491 | local check_timers = function() | ||
492 | local now = now_secs() | ||
493 | local next_wakeup | ||
494 | |||
495 | for linda_deep,t1 in pairs(collection) do | ||
496 | for key,t2 in pairs(t1) do | ||
497 | -- | 487 | -- |
498 | if key==linda_deep then | 488 | local t2 = t1[key] |
499 | -- no 'continue' in Lua :/ | 489 | if not t2 then |
500 | else | 490 | t2= {} |
501 | -- 't2': { wakeup_at_secs [,period_secs] } | 491 | t1[key]= t2 |
492 | end | ||
493 | |||
494 | t2[1] = wakeup_at | ||
495 | t2[2] = period -- can be 'nil' | ||
496 | end | ||
497 | end -- set_timer() | ||
498 | |||
499 | ----- | ||
500 | -- [next_wakeup_at]= check_timers() | ||
501 | -- Check timers, and wake up the ones expired (if any) | ||
502 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | ||
503 | local check_timers = function() | ||
504 | local now = now_secs() | ||
505 | local next_wakeup | ||
506 | |||
507 | for linda_deep,t1 in pairs(collection) do | ||
508 | for key,t2 in pairs(t1) do | ||
502 | -- | 509 | -- |
503 | local wakeup_at= t2[1] | 510 | if key==linda_deep then |
504 | local period= t2[2] -- may be 'nil' | 511 | -- no 'continue' in Lua :/ |
505 | 512 | else | |
506 | if wakeup_at <= now then | 513 | -- 't2': { wakeup_at_secs [,period_secs] } |
507 | local linda= t1[linda_deep] | 514 | -- |
508 | assert(linda) | 515 | local wakeup_at= t2[1] |
509 | 516 | local period= t2[2] -- may be 'nil' | |
510 | linda:set( key, now ) | 517 | |
511 | 518 | if wakeup_at <= now then | |
512 | -- 'pairs()' allows the values to be modified (and even | 519 | local linda= t1[linda_deep] |
513 | -- removed) as far as keys are not touched | 520 | assert(linda) |
514 | 521 | ||
515 | if not period then | 522 | linda:set( key, now ) |
516 | -- one-time timer; gone | 523 | |
517 | -- | 524 | -- 'pairs()' allows the values to be modified (and even |
518 | t1[key]= nil | 525 | -- removed) as far as keys are not touched |
519 | wakeup_at= nil -- no 'continue' in Lua :/ | 526 | |
520 | else | 527 | if not period then |
521 | -- repeating timer; find next wakeup (may jump multiple repeats) | 528 | -- one-time timer; gone |
522 | -- | 529 | -- |
523 | repeat | 530 | t1[key]= nil |
524 | wakeup_at= wakeup_at+period | 531 | wakeup_at= nil -- no 'continue' in Lua :/ |
525 | until wakeup_at > now | 532 | else |
526 | 533 | -- repeating timer; find next wakeup (may jump multiple repeats) | |
527 | t2[1]= wakeup_at | 534 | -- |
535 | repeat | ||
536 | wakeup_at= wakeup_at+period | ||
537 | until wakeup_at > now | ||
538 | |||
539 | t2[1]= wakeup_at | ||
540 | end | ||
528 | end | 541 | end |
529 | end | ||
530 | 542 | ||
531 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then | 543 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then |
532 | next_wakeup= wakeup_at | 544 | next_wakeup= wakeup_at |
545 | end | ||
533 | end | 546 | end |
547 | end -- t2 loop | ||
548 | end -- t1 loop | ||
549 | |||
550 | return next_wakeup -- may be 'nil' | ||
551 | end -- check_timers() | ||
552 | |||
553 | local timer_gateway_batched = timer_gateway.batched | ||
554 | set_finalizer( function( err, stk) | ||
555 | if err and type( err) ~= "userdata" then | ||
556 | WR( "LanesTimer error: "..tostring(err)) | ||
557 | --elseif type( err) == "userdata" then | ||
558 | -- WR( "LanesTimer after cancel" ) | ||
559 | --else | ||
560 | -- WR("LanesTimer finalized") | ||
561 | end | ||
562 | end) | ||
563 | while true do | ||
564 | local next_wakeup = check_timers() | ||
565 | |||
566 | -- Sleep until next timer to wake up, or a set/clear command | ||
567 | -- | ||
568 | local secs | ||
569 | if next_wakeup then | ||
570 | secs = next_wakeup - now_secs() | ||
571 | if secs < 0 then secs = 0 end | ||
572 | end | ||
573 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
574 | |||
575 | if key == TGW_KEY then | ||
576 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
577 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
578 | assert( key) | ||
579 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
580 | elseif key == TGW_QUERY then | ||
581 | if what == "get_timers" then | ||
582 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
583 | else | ||
584 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
534 | end | 585 | end |
535 | end -- t2 loop | 586 | --elseif secs == nil then -- got no value while block-waiting? |
536 | end -- t1 loop | 587 | -- WR( "timer lane: no linda, aborted?") |
537 | 588 | end | |
538 | return next_wakeup -- may be 'nil' | ||
539 | end -- check_timers() | ||
540 | |||
541 | local timer_gateway_batched = timer_gateway.batched | ||
542 | set_finalizer( function( err, stk) | ||
543 | if err and type( err) ~= "userdata" then | ||
544 | WR( "LanesTimer error: "..tostring(err)) | ||
545 | --elseif type( err) == "userdata" then | ||
546 | -- WR( "LanesTimer after cancel" ) | ||
547 | --else | ||
548 | -- WR("LanesTimer finalized") | ||
549 | end | 589 | end |
550 | end) | 590 | end -- timer_body() |
551 | while true do | 591 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... |
552 | local next_wakeup = check_timers() | 592 | end -- first_time |
553 | 593 | ||
554 | -- Sleep until next timer to wake up, or a set/clear command | 594 | ----- |
595 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | ||
596 | -- | ||
597 | -- PUBLIC LANES API | ||
598 | timer = function( linda, key, a, period ) | ||
599 | if getmetatable( linda) ~= "Linda" then | ||
600 | error "expecting a Linda" | ||
601 | end | ||
602 | if a == 0.0 then | ||
603 | -- Caller expects to get current time stamp in Linda, on return | ||
604 | -- (like the timer had expired instantly); it would be good to set this | ||
605 | -- as late as possible (to give most current time) but also we want it | ||
606 | -- to precede any possible timers that might start striking. | ||
555 | -- | 607 | -- |
556 | local secs | 608 | linda:set( key, now_secs()) |
557 | if next_wakeup then | 609 | |
558 | secs = next_wakeup - now_secs() | 610 | if not period or period==0.0 then |
559 | if secs < 0 then secs = 0 end | 611 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer |
560 | end | 612 | return -- nothing more to do |
561 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
562 | |||
563 | if key == TGW_KEY then | ||
564 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
565 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
566 | assert( key) | ||
567 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
568 | elseif key == TGW_QUERY then | ||
569 | if what == "get_timers" then | ||
570 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
571 | else | ||
572 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
573 | end | ||
574 | --elseif secs == nil then -- got no value while block-waiting? | ||
575 | -- WR( "timer lane: no linda, aborted?") | ||
576 | end | 613 | end |
614 | a= period | ||
577 | end | 615 | end |
578 | end -- timer_body() | ||
579 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... | ||
580 | end -- first_time | ||
581 | 616 | ||
582 | ----- | 617 | local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time |
583 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | 618 | or (a and now_secs()+a or nil) |
584 | -- | 619 | -- queue to timer |
585 | -- PUBLIC LANES API | ||
586 | timer = function( linda, key, a, period ) | ||
587 | if getmetatable( linda) ~= "Linda" then | ||
588 | error "expecting a Linda" | ||
589 | end | ||
590 | if a == 0.0 then | ||
591 | -- Caller expects to get current time stamp in Linda, on return | ||
592 | -- (like the timer had expired instantly); it would be good to set this | ||
593 | -- as late as possible (to give most current time) but also we want it | ||
594 | -- to precede any possible timers that might start striking. | ||
595 | -- | 620 | -- |
596 | linda:set( key, core.now_secs()) | 621 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) |
622 | end -- timer() | ||
597 | 623 | ||
598 | if not period or period==0.0 then | 624 | ----- |
599 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer | 625 | -- {[{linda, slot, when, period}[,...]]} = timers() |
600 | return -- nothing more to do | ||
601 | end | ||
602 | a= period | ||
603 | end | ||
604 | |||
605 | local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time | ||
606 | or (a and core.now_secs()+a or nil) | ||
607 | -- queue to timer | ||
608 | -- | 626 | -- |
609 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) | 627 | -- PUBLIC LANES API |
610 | end | 628 | timers = function() |
611 | 629 | timer_gateway:send( TGW_QUERY, "get_timers") | |
612 | ----- | 630 | local _, r = timer_gateway:receive( TGW_REPLY) |
613 | -- {[{linda, slot, when, period}[,...]]} = timers() | 631 | return r |
614 | -- | 632 | end -- timers() |
615 | -- PUBLIC LANES API | ||
616 | timers = function() | ||
617 | timer_gateway:send( TGW_QUERY, "get_timers") | ||
618 | local _, r = timer_gateway:receive( TGW_REPLY) | ||
619 | return r | ||
620 | end | ||
621 | 633 | ||
622 | end -- settings.with_timers | 634 | end -- settings.with_timers |
623 | 635 | ||