aboutsummaryrefslogtreecommitdiff
path: root/src/lanes.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/lanes.lua')
-rw-r--r--src/lanes.lua449
1 files changed, 228 insertions, 221 deletions
diff --git a/src/lanes.lua b/src/lanes.lua
index b4c0070..49900f9 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -70,6 +70,7 @@ lanes.configure = function( settings_)
70 local default_params = 70 local default_params =
71 { 71 {
72 nb_keepers = 1, 72 nb_keepers = 1,
73 keepers_gc_threshold = -1,
73 on_state_create = nil, 74 on_state_create = nil,
74 shutdown_timeout = 0.25, 75 shutdown_timeout = 0.25,
75 with_timers = true, 76 with_timers = true,
@@ -91,6 +92,10 @@ lanes.configure = function( settings_)
91 -- nb_keepers should be a number > 0 92 -- nb_keepers should be a number > 0
92 return type( val_) == "number" and val_ > 0 93 return type( val_) == "number" and val_ > 0
93 end, 94 end,
95 keepers_gc_threshold = function( val_)
96 -- keepers_gc_threshold should be a number
97 return type( val_) == "number"
98 end,
94 with_timers = boolean_param_checker, 99 with_timers = boolean_param_checker,
95 allocator = function( val_) 100 allocator = function( val_)
96 -- can be nil, "protected", or a function 101 -- can be nil, "protected", or a function
@@ -363,261 +368,263 @@ lanes.configure = function( settings_)
363 368
364 if settings.with_timers ~= false then 369 if settings.with_timers ~= false then
365 370
366 -- 371 --
367 -- On first 'require "lanes"', a timer lane is spawned that will maintain 372 -- On first 'require "lanes"', a timer lane is spawned that will maintain
368 -- timer tables and sleep in between the timer events. All interaction with 373 -- timer tables and sleep in between the timer events. All interaction with
369 -- the timer lane happens via a 'timer_gateway' Linda, which is common to 374 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
370 -- all that 'require "lanes"'. 375 -- all that 'require "lanes"'.
371 -- 376 --
372 -- Linda protocol to timer lane: 377 -- Linda protocol to timer lane:
373 -- 378 --
374 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] 379 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
375 -- 380 --
376 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging 381 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
377 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" 382 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
378 local first_time_key= "first time" 383 local first_time_key= "first time"
379
380 local first_time = timer_gateway:get( first_time_key) == nil
381 timer_gateway:set( first_time_key, true)
382 384
383 -- 385 local first_time = timer_gateway:get( first_time_key) == nil
384 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally 386 timer_gateway:set( first_time_key, true)
385 -- has 'table' always declared)
386 --
387 if first_time then
388 387
389 local now_secs = core.now_secs 388 local now_secs = core.now_secs
390 assert( type( now_secs) == "function") 389 local wakeup_conv = core.wakeup_conv
391 ----- 390
392 -- Snore loop (run as a lane on the background)
393 --
394 -- High priority, to get trustworthy timings.
395 -- 391 --
396 -- We let the timer lane be a "free running" thread; no handle to it 392 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
397 -- remains. 393 -- has 'table' always declared)
398 -- 394 --
399 local timer_body = function() 395 if first_time then
400 set_debug_threadname( "LanesTimer") 396
401 -- 397 assert( type( now_secs) == "function")
402 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, 398 -----
403 -- [key]= { wakeup_secs [,period_secs] } [, ...] }, 399 -- Snore loop (run as a lane on the background)
404 -- }
405 --
406 -- Collection of all running timers, indexed with linda's & key.
407 -- 400 --
408 -- Note that we need to use the deep lightuserdata identifiers, instead 401 -- High priority, to get trustworthy timings.
409 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
410 -- entries for the same timer.
411 -- 402 --
412 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but 403 -- We let the timer lane be a "free running" thread; no handle to it
413 -- also important to keep the Linda alive, even if all outside world threw 404 -- remains.
414 -- away pointers to it (which would ruin uniqueness of the deep pointer).
415 -- Now we're safe.
416 -- 405 --
417 local collection = {} 406 local timer_body = function()
418 local table_insert = assert( table.insert) 407 set_debug_threadname( "LanesTimer")
419 408 --
420 local get_timers = function() 409 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
421 local r = {} 410 -- [key]= { wakeup_secs [,period_secs] } [, ...] },
422 for deep, t in pairs( collection) do 411 -- }
423 -- WR( tostring( deep)) 412 --
424 local l = t[deep] 413 -- Collection of all running timers, indexed with linda's & key.
425 for key, timer_data in pairs( t) do 414 --
426 if key ~= deep then 415 -- Note that we need to use the deep lightuserdata identifiers, instead
427 table_insert( r, {l, key, timer_data}) 416 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
417 -- entries for the same timer.
418 --
419 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
420 -- also important to keep the Linda alive, even if all outside world threw
421 -- away pointers to it (which would ruin uniqueness of the deep pointer).
422 -- Now we're safe.
423 --
424 local collection = {}
425 local table_insert = assert( table.insert)
426
427 local get_timers = function()
428 local r = {}
429 for deep, t in pairs( collection) do
430 -- WR( tostring( deep))
431 local l = t[deep]
432 for key, timer_data in pairs( t) do
433 if key ~= deep then
434 table_insert( r, {l, key, timer_data})
435 end
428 end 436 end
429 end 437 end
430 end 438 return r
431 return r 439 end -- get_timers()
432 end -- get_timers()
433
434 --
435 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
436 --
437 local set_timer = function( linda, key, wakeup_at, period)
438 assert( wakeup_at == nil or wakeup_at > 0.0)
439 assert( period == nil or period > 0.0)
440
441 local linda_deep = linda:deep()
442 assert( linda_deep)
443 440
444 -- Find or make a lookup for this timer
445 -- 441 --
446 local t1 = collection[linda_deep] 442 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
447 if not t1 then 443 --
448 t1 = { [linda_deep] = linda} -- proxy to use the Linda 444 local set_timer = function( linda, key, wakeup_at, period)
449 collection[linda_deep] = t1 445 assert( wakeup_at == nil or wakeup_at > 0.0)
450 end 446 assert( period == nil or period > 0.0)
451 447
452 if wakeup_at == nil then 448 local linda_deep = linda:deep()
453 -- Clear the timer 449 assert( linda_deep)
454 --
455 t1[key]= nil
456 450
457 -- Remove empty tables from collection; speeds timer checks and 451 -- Find or make a lookup for this timer
458 -- lets our 'safety reference' proxy be gc:ed as well.
459 -- 452 --
460 local empty = true 453 local t1 = collection[linda_deep]
461 for k, _ in pairs( t1) do 454 if not t1 then
462 if k ~= linda_deep then 455 t1 = { [linda_deep] = linda} -- proxy to use the Linda
463 empty = false 456 collection[linda_deep] = t1
464 break
465 end
466 end
467 if empty then
468 collection[linda_deep] = nil
469 end 457 end
470 458
471 -- Note: any unread timer value is left at 'linda[key]' intensionally; 459 if wakeup_at == nil then
472 -- clearing a timer just stops it. 460 -- Clear the timer
473 else 461 --
474 -- New timer or changing the timings 462 t1[key]= nil
475 --
476 local t2 = t1[key]
477 if not t2 then
478 t2= {}
479 t1[key]= t2
480 end
481 463
482 t2[1] = wakeup_at 464 -- Remove empty tables from collection; speeds timer checks and
483 t2[2] = period -- can be 'nil' 465 -- lets our 'safety reference' proxy be gc:ed as well.
484 end 466 --
485 end -- set_timer() 467 local empty = true
468 for k, _ in pairs( t1) do
469 if k ~= linda_deep then
470 empty = false
471 break
472 end
473 end
474 if empty then
475 collection[linda_deep] = nil
476 end
486 477
487 ----- 478 -- Note: any unread timer value is left at 'linda[key]' intensionally;
488 -- [next_wakeup_at]= check_timers() 479 -- clearing a timer just stops it.
489 -- Check timers, and wake up the ones expired (if any) 480 else
490 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). 481 -- New timer or changing the timings
491 local check_timers = function()
492 local now = now_secs()
493 local next_wakeup
494
495 for linda_deep,t1 in pairs(collection) do
496 for key,t2 in pairs(t1) do
497 -- 482 --
498 if key==linda_deep then 483 local t2 = t1[key]
499 -- no 'continue' in Lua :/ 484 if not t2 then
500 else 485 t2= {}
501 -- 't2': { wakeup_at_secs [,period_secs] } 486 t1[key]= t2
487 end
488
489 t2[1] = wakeup_at
490 t2[2] = period -- can be 'nil'
491 end
492 end -- set_timer()
493
494 -----
495 -- [next_wakeup_at]= check_timers()
496 -- Check timers, and wake up the ones expired (if any)
497 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
498 local check_timers = function()
499 local now = now_secs()
500 local next_wakeup
501
502 for linda_deep,t1 in pairs(collection) do
503 for key,t2 in pairs(t1) do
502 -- 504 --
503 local wakeup_at= t2[1] 505 if key==linda_deep then
504 local period= t2[2] -- may be 'nil' 506 -- no 'continue' in Lua :/
505 507 else
506 if wakeup_at <= now then 508 -- 't2': { wakeup_at_secs [,period_secs] }
507 local linda= t1[linda_deep] 509 --
508 assert(linda) 510 local wakeup_at= t2[1]
509 511 local period= t2[2] -- may be 'nil'
510 linda:set( key, now ) 512
511 513 if wakeup_at <= now then
512 -- 'pairs()' allows the values to be modified (and even 514 local linda= t1[linda_deep]
513 -- removed) as far as keys are not touched 515 assert(linda)
514 516
515 if not period then 517 linda:set( key, now )
516 -- one-time timer; gone 518
517 -- 519 -- 'pairs()' allows the values to be modified (and even
518 t1[key]= nil 520 -- removed) as far as keys are not touched
519 wakeup_at= nil -- no 'continue' in Lua :/ 521
520 else 522 if not period then
521 -- repeating timer; find next wakeup (may jump multiple repeats) 523 -- one-time timer; gone
522 -- 524 --
523 repeat 525 t1[key]= nil
524 wakeup_at= wakeup_at+period 526 wakeup_at= nil -- no 'continue' in Lua :/
525 until wakeup_at > now 527 else
526 528 -- repeating timer; find next wakeup (may jump multiple repeats)
527 t2[1]= wakeup_at 529 --
530 repeat
531 wakeup_at= wakeup_at+period
532 until wakeup_at > now
533
534 t2[1]= wakeup_at
535 end
528 end 536 end
529 end
530 537
531 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then 538 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
532 next_wakeup= wakeup_at 539 next_wakeup= wakeup_at
540 end
533 end 541 end
542 end -- t2 loop
543 end -- t1 loop
544
545 return next_wakeup -- may be 'nil'
546 end -- check_timers()
547
548 local timer_gateway_batched = timer_gateway.batched
549 set_finalizer( function( err, stk)
550 if err and type( err) ~= "userdata" then
551 WR( "LanesTimer error: "..tostring(err))
552 --elseif type( err) == "userdata" then
553 -- WR( "LanesTimer after cancel" )
554 --else
555 -- WR("LanesTimer finalized")
556 end
557 end)
558 while true do
559 local next_wakeup = check_timers()
560
561 -- Sleep until next timer to wake up, or a set/clear command
562 --
563 local secs
564 if next_wakeup then
565 secs = next_wakeup - now_secs()
566 if secs < 0 then secs = 0 end
567 end
568 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
569
570 if key == TGW_KEY then
571 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
572 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
573 assert( key)
574 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
575 elseif key == TGW_QUERY then
576 if what == "get_timers" then
577 timer_gateway:send( TGW_REPLY, get_timers())
578 else
579 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
534 end 580 end
535 end -- t2 loop 581 --elseif secs == nil then -- got no value while block-waiting?
536 end -- t1 loop 582 -- WR( "timer lane: no linda, aborted?")
537 583 end
538 return next_wakeup -- may be 'nil'
539 end -- check_timers()
540
541 local timer_gateway_batched = timer_gateway.batched
542 set_finalizer( function( err, stk)
543 if err and type( err) ~= "userdata" then
544 WR( "LanesTimer error: "..tostring(err))
545 --elseif type( err) == "userdata" then
546 -- WR( "LanesTimer after cancel" )
547 --else
548 -- WR("LanesTimer finalized")
549 end 584 end
550 end) 585 end -- timer_body()
551 while true do 586 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
552 local next_wakeup = check_timers() 587 end -- first_time
553 588
554 -- Sleep until next timer to wake up, or a set/clear command 589 -----
590 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
591 --
592 -- PUBLIC LANES API
593 timer = function( linda, key, a, period )
594 if getmetatable( linda) ~= "Linda" then
595 error "expecting a Linda"
596 end
597 if a == 0.0 then
598 -- Caller expects to get current time stamp in Linda, on return
599 -- (like the timer had expired instantly); it would be good to set this
600 -- as late as possible (to give most current time) but also we want it
601 -- to precede any possible timers that might start striking.
555 -- 602 --
556 local secs 603 linda:set( key, now_secs())
557 if next_wakeup then 604
558 secs = next_wakeup - now_secs() 605 if not period or period==0.0 then
559 if secs < 0 then secs = 0 end 606 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
560 end 607 return -- nothing more to do
561 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
562
563 if key == TGW_KEY then
564 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
565 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
566 assert( key)
567 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
568 elseif key == TGW_QUERY then
569 if what == "get_timers" then
570 timer_gateway:send( TGW_REPLY, get_timers())
571 else
572 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
573 end
574 --elseif secs == nil then -- got no value while block-waiting?
575 -- WR( "timer lane: no linda, aborted?")
576 end 608 end
609 a= period
577 end 610 end
578 end -- timer_body()
579 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
580 end -- first_time
581 611
582 ----- 612 local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
583 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) 613 or (a and now_secs()+a or nil)
584 -- 614 -- queue to timer
585 -- PUBLIC LANES API
586 timer = function( linda, key, a, period )
587 if getmetatable( linda) ~= "Linda" then
588 error "expecting a Linda"
589 end
590 if a == 0.0 then
591 -- Caller expects to get current time stamp in Linda, on return
592 -- (like the timer had expired instantly); it would be good to set this
593 -- as late as possible (to give most current time) but also we want it
594 -- to precede any possible timers that might start striking.
595 -- 615 --
596 linda:set( key, core.now_secs()) 616 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
617 end -- timer()
597 618
598 if not period or period==0.0 then 619 -----
599 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer 620 -- {[{linda, slot, when, period}[,...]]} = timers()
600 return -- nothing more to do
601 end
602 a= period
603 end
604
605 local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time
606 or (a and core.now_secs()+a or nil)
607 -- queue to timer
608 -- 621 --
609 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) 622 -- PUBLIC LANES API
610 end 623 timers = function()
611 624 timer_gateway:send( TGW_QUERY, "get_timers")
612 ----- 625 local _, r = timer_gateway:receive( TGW_REPLY)
613 -- {[{linda, slot, when, period}[,...]]} = timers() 626 return r
614 -- 627 end -- timers()
615 -- PUBLIC LANES API
616 timers = function()
617 timer_gateway:send( TGW_QUERY, "get_timers")
618 local _, r = timer_gateway:receive( TGW_REPLY)
619 return r
620 end
621 628
622 end -- settings.with_timers 629 end -- settings.with_timers
623 630