diff options
Diffstat (limited to 'src/lanes.lua')
-rw-r--r-- | src/lanes.lua | 449 |
1 files changed, 228 insertions, 221 deletions
diff --git a/src/lanes.lua b/src/lanes.lua index b4c0070..49900f9 100644 --- a/src/lanes.lua +++ b/src/lanes.lua | |||
@@ -70,6 +70,7 @@ lanes.configure = function( settings_) | |||
70 | local default_params = | 70 | local default_params = |
71 | { | 71 | { |
72 | nb_keepers = 1, | 72 | nb_keepers = 1, |
73 | keepers_gc_threshold = -1, | ||
73 | on_state_create = nil, | 74 | on_state_create = nil, |
74 | shutdown_timeout = 0.25, | 75 | shutdown_timeout = 0.25, |
75 | with_timers = true, | 76 | with_timers = true, |
@@ -91,6 +92,10 @@ lanes.configure = function( settings_) | |||
91 | -- nb_keepers should be a number > 0 | 92 | -- nb_keepers should be a number > 0 |
92 | return type( val_) == "number" and val_ > 0 | 93 | return type( val_) == "number" and val_ > 0 |
93 | end, | 94 | end, |
95 | keepers_gc_threshold = function( val_) | ||
96 | -- keepers_gc_threshold should be a number | ||
97 | return type( val_) == "number" | ||
98 | end, | ||
94 | with_timers = boolean_param_checker, | 99 | with_timers = boolean_param_checker, |
95 | allocator = function( val_) | 100 | allocator = function( val_) |
96 | -- can be nil, "protected", or a function | 101 | -- can be nil, "protected", or a function |
@@ -363,261 +368,263 @@ lanes.configure = function( settings_) | |||
363 | 368 | ||
364 | if settings.with_timers ~= false then | 369 | if settings.with_timers ~= false then |
365 | 370 | ||
366 | -- | 371 | -- |
367 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | 372 | -- On first 'require "lanes"', a timer lane is spawned that will maintain |
368 | -- timer tables and sleep in between the timer events. All interaction with | 373 | -- timer tables and sleep in between the timer events. All interaction with |
369 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | 374 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to |
370 | -- all that 'require "lanes"'. | 375 | -- all that 'require "lanes"'. |
371 | -- | 376 | -- |
372 | -- Linda protocol to timer lane: | 377 | -- Linda protocol to timer lane: |
373 | -- | 378 | -- |
374 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | 379 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] |
375 | -- | 380 | -- |
376 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | 381 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging |
377 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | 382 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" |
378 | local first_time_key= "first time" | 383 | local first_time_key= "first time" |
379 | |||
380 | local first_time = timer_gateway:get( first_time_key) == nil | ||
381 | timer_gateway:set( first_time_key, true) | ||
382 | 384 | ||
383 | -- | 385 | local first_time = timer_gateway:get( first_time_key) == nil |
384 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally | 386 | timer_gateway:set( first_time_key, true) |
385 | -- has 'table' always declared) | ||
386 | -- | ||
387 | if first_time then | ||
388 | 387 | ||
389 | local now_secs = core.now_secs | 388 | local now_secs = core.now_secs |
390 | assert( type( now_secs) == "function") | 389 | local wakeup_conv = core.wakeup_conv |
391 | ----- | 390 | |
392 | -- Snore loop (run as a lane on the background) | ||
393 | -- | ||
394 | -- High priority, to get trustworthy timings. | ||
395 | -- | 391 | -- |
396 | -- We let the timer lane be a "free running" thread; no handle to it | 392 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally |
397 | -- remains. | 393 | -- has 'table' always declared) |
398 | -- | 394 | -- |
399 | local timer_body = function() | 395 | if first_time then |
400 | set_debug_threadname( "LanesTimer") | 396 | |
401 | -- | 397 | assert( type( now_secs) == "function") |
402 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, | 398 | ----- |
403 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, | 399 | -- Snore loop (run as a lane on the background) |
404 | -- } | ||
405 | -- | ||
406 | -- Collection of all running timers, indexed with linda's & key. | ||
407 | -- | 400 | -- |
408 | -- Note that we need to use the deep lightuserdata identifiers, instead | 401 | -- High priority, to get trustworthy timings. |
409 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple | ||
410 | -- entries for the same timer. | ||
411 | -- | 402 | -- |
412 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | 403 | -- We let the timer lane be a "free running" thread; no handle to it |
413 | -- also important to keep the Linda alive, even if all outside world threw | 404 | -- remains. |
414 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
415 | -- Now we're safe. | ||
416 | -- | 405 | -- |
417 | local collection = {} | 406 | local timer_body = function() |
418 | local table_insert = assert( table.insert) | 407 | set_debug_threadname( "LanesTimer") |
419 | 408 | -- | |
420 | local get_timers = function() | 409 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, |
421 | local r = {} | 410 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, |
422 | for deep, t in pairs( collection) do | 411 | -- } |
423 | -- WR( tostring( deep)) | 412 | -- |
424 | local l = t[deep] | 413 | -- Collection of all running timers, indexed with linda's & key. |
425 | for key, timer_data in pairs( t) do | 414 | -- |
426 | if key ~= deep then | 415 | -- Note that we need to use the deep lightuserdata identifiers, instead |
427 | table_insert( r, {l, key, timer_data}) | 416 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple |
417 | -- entries for the same timer. | ||
418 | -- | ||
419 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | ||
420 | -- also important to keep the Linda alive, even if all outside world threw | ||
421 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
422 | -- Now we're safe. | ||
423 | -- | ||
424 | local collection = {} | ||
425 | local table_insert = assert( table.insert) | ||
426 | |||
427 | local get_timers = function() | ||
428 | local r = {} | ||
429 | for deep, t in pairs( collection) do | ||
430 | -- WR( tostring( deep)) | ||
431 | local l = t[deep] | ||
432 | for key, timer_data in pairs( t) do | ||
433 | if key ~= deep then | ||
434 | table_insert( r, {l, key, timer_data}) | ||
435 | end | ||
428 | end | 436 | end |
429 | end | 437 | end |
430 | end | 438 | return r |
431 | return r | 439 | end -- get_timers() |
432 | end -- get_timers() | ||
433 | |||
434 | -- | ||
435 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) | ||
436 | -- | ||
437 | local set_timer = function( linda, key, wakeup_at, period) | ||
438 | assert( wakeup_at == nil or wakeup_at > 0.0) | ||
439 | assert( period == nil or period > 0.0) | ||
440 | |||
441 | local linda_deep = linda:deep() | ||
442 | assert( linda_deep) | ||
443 | 440 | ||
444 | -- Find or make a lookup for this timer | ||
445 | -- | 441 | -- |
446 | local t1 = collection[linda_deep] | 442 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) |
447 | if not t1 then | 443 | -- |
448 | t1 = { [linda_deep] = linda} -- proxy to use the Linda | 444 | local set_timer = function( linda, key, wakeup_at, period) |
449 | collection[linda_deep] = t1 | 445 | assert( wakeup_at == nil or wakeup_at > 0.0) |
450 | end | 446 | assert( period == nil or period > 0.0) |
451 | 447 | ||
452 | if wakeup_at == nil then | 448 | local linda_deep = linda:deep() |
453 | -- Clear the timer | 449 | assert( linda_deep) |
454 | -- | ||
455 | t1[key]= nil | ||
456 | 450 | ||
457 | -- Remove empty tables from collection; speeds timer checks and | 451 | -- Find or make a lookup for this timer |
458 | -- lets our 'safety reference' proxy be gc:ed as well. | ||
459 | -- | 452 | -- |
460 | local empty = true | 453 | local t1 = collection[linda_deep] |
461 | for k, _ in pairs( t1) do | 454 | if not t1 then |
462 | if k ~= linda_deep then | 455 | t1 = { [linda_deep] = linda} -- proxy to use the Linda |
463 | empty = false | 456 | collection[linda_deep] = t1 |
464 | break | ||
465 | end | ||
466 | end | ||
467 | if empty then | ||
468 | collection[linda_deep] = nil | ||
469 | end | 457 | end |
470 | 458 | ||
471 | -- Note: any unread timer value is left at 'linda[key]' intensionally; | 459 | if wakeup_at == nil then |
472 | -- clearing a timer just stops it. | 460 | -- Clear the timer |
473 | else | 461 | -- |
474 | -- New timer or changing the timings | 462 | t1[key]= nil |
475 | -- | ||
476 | local t2 = t1[key] | ||
477 | if not t2 then | ||
478 | t2= {} | ||
479 | t1[key]= t2 | ||
480 | end | ||
481 | 463 | ||
482 | t2[1] = wakeup_at | 464 | -- Remove empty tables from collection; speeds timer checks and |
483 | t2[2] = period -- can be 'nil' | 465 | -- lets our 'safety reference' proxy be gc:ed as well. |
484 | end | 466 | -- |
485 | end -- set_timer() | 467 | local empty = true |
468 | for k, _ in pairs( t1) do | ||
469 | if k ~= linda_deep then | ||
470 | empty = false | ||
471 | break | ||
472 | end | ||
473 | end | ||
474 | if empty then | ||
475 | collection[linda_deep] = nil | ||
476 | end | ||
486 | 477 | ||
487 | ----- | 478 | -- Note: any unread timer value is left at 'linda[key]' intensionally; |
488 | -- [next_wakeup_at]= check_timers() | 479 | -- clearing a timer just stops it. |
489 | -- Check timers, and wake up the ones expired (if any) | 480 | else |
490 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | 481 | -- New timer or changing the timings |
491 | local check_timers = function() | ||
492 | local now = now_secs() | ||
493 | local next_wakeup | ||
494 | |||
495 | for linda_deep,t1 in pairs(collection) do | ||
496 | for key,t2 in pairs(t1) do | ||
497 | -- | 482 | -- |
498 | if key==linda_deep then | 483 | local t2 = t1[key] |
499 | -- no 'continue' in Lua :/ | 484 | if not t2 then |
500 | else | 485 | t2= {} |
501 | -- 't2': { wakeup_at_secs [,period_secs] } | 486 | t1[key]= t2 |
487 | end | ||
488 | |||
489 | t2[1] = wakeup_at | ||
490 | t2[2] = period -- can be 'nil' | ||
491 | end | ||
492 | end -- set_timer() | ||
493 | |||
494 | ----- | ||
495 | -- [next_wakeup_at]= check_timers() | ||
496 | -- Check timers, and wake up the ones expired (if any) | ||
497 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | ||
498 | local check_timers = function() | ||
499 | local now = now_secs() | ||
500 | local next_wakeup | ||
501 | |||
502 | for linda_deep,t1 in pairs(collection) do | ||
503 | for key,t2 in pairs(t1) do | ||
502 | -- | 504 | -- |
503 | local wakeup_at= t2[1] | 505 | if key==linda_deep then |
504 | local period= t2[2] -- may be 'nil' | 506 | -- no 'continue' in Lua :/ |
505 | 507 | else | |
506 | if wakeup_at <= now then | 508 | -- 't2': { wakeup_at_secs [,period_secs] } |
507 | local linda= t1[linda_deep] | 509 | -- |
508 | assert(linda) | 510 | local wakeup_at= t2[1] |
509 | 511 | local period= t2[2] -- may be 'nil' | |
510 | linda:set( key, now ) | 512 | |
511 | 513 | if wakeup_at <= now then | |
512 | -- 'pairs()' allows the values to be modified (and even | 514 | local linda= t1[linda_deep] |
513 | -- removed) as far as keys are not touched | 515 | assert(linda) |
514 | 516 | ||
515 | if not period then | 517 | linda:set( key, now ) |
516 | -- one-time timer; gone | 518 | |
517 | -- | 519 | -- 'pairs()' allows the values to be modified (and even |
518 | t1[key]= nil | 520 | -- removed) as far as keys are not touched |
519 | wakeup_at= nil -- no 'continue' in Lua :/ | 521 | |
520 | else | 522 | if not period then |
521 | -- repeating timer; find next wakeup (may jump multiple repeats) | 523 | -- one-time timer; gone |
522 | -- | 524 | -- |
523 | repeat | 525 | t1[key]= nil |
524 | wakeup_at= wakeup_at+period | 526 | wakeup_at= nil -- no 'continue' in Lua :/ |
525 | until wakeup_at > now | 527 | else |
526 | 528 | -- repeating timer; find next wakeup (may jump multiple repeats) | |
527 | t2[1]= wakeup_at | 529 | -- |
530 | repeat | ||
531 | wakeup_at= wakeup_at+period | ||
532 | until wakeup_at > now | ||
533 | |||
534 | t2[1]= wakeup_at | ||
535 | end | ||
528 | end | 536 | end |
529 | end | ||
530 | 537 | ||
531 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then | 538 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then |
532 | next_wakeup= wakeup_at | 539 | next_wakeup= wakeup_at |
540 | end | ||
533 | end | 541 | end |
542 | end -- t2 loop | ||
543 | end -- t1 loop | ||
544 | |||
545 | return next_wakeup -- may be 'nil' | ||
546 | end -- check_timers() | ||
547 | |||
548 | local timer_gateway_batched = timer_gateway.batched | ||
549 | set_finalizer( function( err, stk) | ||
550 | if err and type( err) ~= "userdata" then | ||
551 | WR( "LanesTimer error: "..tostring(err)) | ||
552 | --elseif type( err) == "userdata" then | ||
553 | -- WR( "LanesTimer after cancel" ) | ||
554 | --else | ||
555 | -- WR("LanesTimer finalized") | ||
556 | end | ||
557 | end) | ||
558 | while true do | ||
559 | local next_wakeup = check_timers() | ||
560 | |||
561 | -- Sleep until next timer to wake up, or a set/clear command | ||
562 | -- | ||
563 | local secs | ||
564 | if next_wakeup then | ||
565 | secs = next_wakeup - now_secs() | ||
566 | if secs < 0 then secs = 0 end | ||
567 | end | ||
568 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
569 | |||
570 | if key == TGW_KEY then | ||
571 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
572 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
573 | assert( key) | ||
574 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
575 | elseif key == TGW_QUERY then | ||
576 | if what == "get_timers" then | ||
577 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
578 | else | ||
579 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
534 | end | 580 | end |
535 | end -- t2 loop | 581 | --elseif secs == nil then -- got no value while block-waiting? |
536 | end -- t1 loop | 582 | -- WR( "timer lane: no linda, aborted?") |
537 | 583 | end | |
538 | return next_wakeup -- may be 'nil' | ||
539 | end -- check_timers() | ||
540 | |||
541 | local timer_gateway_batched = timer_gateway.batched | ||
542 | set_finalizer( function( err, stk) | ||
543 | if err and type( err) ~= "userdata" then | ||
544 | WR( "LanesTimer error: "..tostring(err)) | ||
545 | --elseif type( err) == "userdata" then | ||
546 | -- WR( "LanesTimer after cancel" ) | ||
547 | --else | ||
548 | -- WR("LanesTimer finalized") | ||
549 | end | 584 | end |
550 | end) | 585 | end -- timer_body() |
551 | while true do | 586 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... |
552 | local next_wakeup = check_timers() | 587 | end -- first_time |
553 | 588 | ||
554 | -- Sleep until next timer to wake up, or a set/clear command | 589 | ----- |
590 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | ||
591 | -- | ||
592 | -- PUBLIC LANES API | ||
593 | timer = function( linda, key, a, period ) | ||
594 | if getmetatable( linda) ~= "Linda" then | ||
595 | error "expecting a Linda" | ||
596 | end | ||
597 | if a == 0.0 then | ||
598 | -- Caller expects to get current time stamp in Linda, on return | ||
599 | -- (like the timer had expired instantly); it would be good to set this | ||
600 | -- as late as possible (to give most current time) but also we want it | ||
601 | -- to precede any possible timers that might start striking. | ||
555 | -- | 602 | -- |
556 | local secs | 603 | linda:set( key, now_secs()) |
557 | if next_wakeup then | 604 | |
558 | secs = next_wakeup - now_secs() | 605 | if not period or period==0.0 then |
559 | if secs < 0 then secs = 0 end | 606 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer |
560 | end | 607 | return -- nothing more to do |
561 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
562 | |||
563 | if key == TGW_KEY then | ||
564 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
565 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
566 | assert( key) | ||
567 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
568 | elseif key == TGW_QUERY then | ||
569 | if what == "get_timers" then | ||
570 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
571 | else | ||
572 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
573 | end | ||
574 | --elseif secs == nil then -- got no value while block-waiting? | ||
575 | -- WR( "timer lane: no linda, aborted?") | ||
576 | end | 608 | end |
609 | a= period | ||
577 | end | 610 | end |
578 | end -- timer_body() | ||
579 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... | ||
580 | end -- first_time | ||
581 | 611 | ||
582 | ----- | 612 | local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time |
583 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | 613 | or (a and now_secs()+a or nil) |
584 | -- | 614 | -- queue to timer |
585 | -- PUBLIC LANES API | ||
586 | timer = function( linda, key, a, period ) | ||
587 | if getmetatable( linda) ~= "Linda" then | ||
588 | error "expecting a Linda" | ||
589 | end | ||
590 | if a == 0.0 then | ||
591 | -- Caller expects to get current time stamp in Linda, on return | ||
592 | -- (like the timer had expired instantly); it would be good to set this | ||
593 | -- as late as possible (to give most current time) but also we want it | ||
594 | -- to precede any possible timers that might start striking. | ||
595 | -- | 615 | -- |
596 | linda:set( key, core.now_secs()) | 616 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) |
617 | end -- timer() | ||
597 | 618 | ||
598 | if not period or period==0.0 then | 619 | ----- |
599 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer | 620 | -- {[{linda, slot, when, period}[,...]]} = timers() |
600 | return -- nothing more to do | ||
601 | end | ||
602 | a= period | ||
603 | end | ||
604 | |||
605 | local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time | ||
606 | or (a and core.now_secs()+a or nil) | ||
607 | -- queue to timer | ||
608 | -- | 621 | -- |
609 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) | 622 | -- PUBLIC LANES API |
610 | end | 623 | timers = function() |
611 | 624 | timer_gateway:send( TGW_QUERY, "get_timers") | |
612 | ----- | 625 | local _, r = timer_gateway:receive( TGW_REPLY) |
613 | -- {[{linda, slot, when, period}[,...]]} = timers() | 626 | return r |
614 | -- | 627 | end -- timers() |
615 | -- PUBLIC LANES API | ||
616 | timers = function() | ||
617 | timer_gateway:send( TGW_QUERY, "get_timers") | ||
618 | local _, r = timer_gateway:receive( TGW_REPLY) | ||
619 | return r | ||
620 | end | ||
621 | 628 | ||
622 | end -- settings.with_timers | 629 | end -- settings.with_timers |
623 | 630 | ||