aboutsummaryrefslogtreecommitdiff
path: root/src/lanes.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/lanes.lua')
-rw-r--r--src/lanes.lua611
1 files changed, 611 insertions, 0 deletions
diff --git a/src/lanes.lua b/src/lanes.lua
new file mode 100644
index 0000000..c68506d
--- /dev/null
+++ b/src/lanes.lua
@@ -0,0 +1,611 @@
1--
2-- LANES.LUA
3--
4-- Multithreading and -core support for Lua
5--
6-- Author: Asko Kauppi <akauppi@gmail.com>
7--
8-- History:
9-- Jun-08 AKa: major revise
10-- 15-May-07 AKa: pthread_join():less version, some speedup & ability to
11-- handle more threads (~ 8000-9000, up from ~ 5000)
12-- 26-Feb-07 AKa: serialization working (C side)
13-- 17-Sep-06 AKa: started the module (serialization)
14--
15--[[
16===============================================================================
17
18Copyright (C) 2007-08 Asko Kauppi <akauppi@gmail.com>
19
20Permission is hereby granted, free of charge, to any person obtaining a copy
21of this software and associated documentation files (the "Software"), to deal
22in the Software without restriction, including without limitation the rights
23to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
24copies of the Software, and to permit persons to whom the Software is
25furnished to do so, subject to the following conditions:
26
27The above copyright notice and this permission notice shall be included in
28all copies or substantial portions of the Software.
29
30THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
31IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
32FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
33AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
34LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
35OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
36THE SOFTWARE.
37
38===============================================================================
39]]--
40
41module( "lanes", package.seeall )
42
43require "lua51-lanes"
44assert( type(lanes)=="table" )
45
46local mm= lanes
47
48local linda_id= assert( mm.linda_id )
49
50local thread_new= assert(mm.thread_new)
51local thread_status= assert(mm.thread_status)
52local thread_join= assert(mm.thread_join)
53local thread_cancel= assert(mm.thread_cancel)
54
55local _single= assert(mm._single)
56local _version= assert(mm._version)
57
58local _deep_userdata= assert(mm._deep_userdata)
59
60local now_secs= assert( mm.now_secs )
61local wakeup_conv= assert( mm.wakeup_conv )
62local timer_gateway= assert( mm.timer_gateway )
63
64local max_prio= assert( mm.max_prio )
65
66-- This check is for sublanes requiring Lanes
67--
68-- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler.
69--
70if not string then
71 error( "To use 'lanes', you will also need to have 'string' available.", 2 )
72end
73
74--
75-- Cache globals for code that might run under sandboxing
76--
77local assert= assert
78local string_gmatch= assert( string.gmatch )
79local select= assert( select )
80local type= assert( type )
81local pairs= assert( pairs )
82local tostring= assert( tostring )
83local error= assert( error )
84local setmetatable= assert( setmetatable )
85local rawget= assert( rawget )
86
87ABOUT=
88{
89 author= "Asko Kauppi <akauppi@gmail.com>",
90 description= "Running multiple Lua states in parallel",
91 license= "MIT/X11",
92 copyright= "Copyright (c) 2007-08, Asko Kauppi",
93 version= _version,
94}
95
96
97-- Making copies of necessary system libs will pass them on as upvalues;
98-- only the first state doing "require 'lanes'" will need to have 'string'
99-- and 'table' visible.
100--
101local function WR(str)
102 io.stderr:write( str.."\n" )
103end
104
105local function DUMP( tbl )
106 if not tbl then return end
107 local str=""
108 for k,v in pairs(tbl) do
109 str= str..k.."="..tostring(v).."\n"
110 end
111 WR(str)
112end
113
114
115---=== Laning ===---
116
117-- lane_h[1..n]: lane results, same as via 'lane_h:join()'
118-- lane_h[0]: can be read to make sure a thread has finished (always gives 'true')
119-- lane_h[-1]: error message, without propagating the error
120--
121-- Reading a Lane result (or [0]) propagates a possible error in the lane
122-- (and execution does not return). Cancelled lanes give 'nil' values.
123--
124-- lane_h.state: "pending"/"running"/"waiting"/"done"/"error"/"cancelled"
125--
126local lane_mt= {
127 __index= function( me, k )
128 if type(k) == "number" then
129 -- 'me[0]=true' marks we've already taken in the results
130 --
131 if not rawget( me, 0 ) then
132 -- Wait indefinately; either propagates an error or
133 -- returns the return values
134 --
135 me[0]= true -- marker, even on errors
136
137 local t= { thread_join(me._ud) } -- wait indefinate
138 --
139 -- { ... } "done": regular return, 0..N results
140 -- { } "cancelled"
141 -- { nil, err_str, stack_tbl } "error"
142
143 local st= thread_status(me._ud)
144 if st=="done" then
145 -- Use 'pairs' and not 'ipairs' so that nil holes in
146 -- the returned values are tolerated.
147 --
148 for i,v in pairs(t) do
149 me[i]= v
150 end
151 elseif st=="error" then
152 assert( t[1]==nil and t[2] and type(t[3])=="table" )
153 me[-1]= t[2]
154 -- me[-2] could carry the stack table, but even
155 -- me[-1] is rather unnecessary (and undocumented);
156 -- use ':join()' instead. --AKa 22-Jan-2009
157 elseif st=="cancelled" then
158 -- do nothing
159 else
160 error( "Unexpected status: "..st )
161 end
162 end
163
164 -- Check errors even if we'd first peeked them via [-1]
165 -- and then came for the actual results.
166 --
167 local err= rawget(me, -1)
168 if err~=nil and k~=-1 then
169 -- Note: Lua 5.1 interpreter is not prepared to show
170 -- non-string errors, so we use 'tostring()' here
171 -- to get meaningful output. --AKa 22-Jan-2009
172 --
173 -- Also, the stack dump we get is no good; it only
174 -- lists our internal Lanes functions. There seems
175 -- to be no way to switch it off, though.
176
177 -- Level 3 should show the line where 'h[x]' was read
178 -- but this only seems to work for string messages
179 -- (Lua 5.1.4). No idea, why. --AKa 22-Jan-2009
180 --
181 error( tostring(err), 3 ) -- level 3 should show the line where 'h[x]' was read
182 end
183 return rawget( me, k )
184 --
185 elseif k=="status" then -- me.status
186 return thread_status(me._ud)
187 --
188 else
189 error( "Unknown key: "..k )
190 end
191 end
192 }
193
194-----
195-- h= lanes.gen( [libs_str|opt_tbl [, ...],] lane_func ) ( [...] )
196--
197-- 'libs': nil: no libraries available (default)
198-- "": only base library ('assert', 'print', 'unpack' etc.)
199-- "math,os": math + os + base libraries (named ones + base)
200-- "*": all standard libraries available
201--
202-- 'opt': .priority: int (-2..+2) smaller is lower priority (0 = default)
203--
204-- .cancelstep: bool | uint
205-- false: cancellation check only at pending Linda operations
206-- (send/receive) so no runtime performance penalty (default)
207-- true: adequate cancellation check (same as 100)
208-- >0: cancellation check every x Lua lines (small number= faster
209-- reaction but more performance overhead)
210--
211-- .globals: table of globals to set for a new thread (passed by value)
212--
213-- ... (more options may be introduced later) ...
214--
215-- Calling with a function parameter ('lane_func') ends the string/table
216-- modifiers, and prepares a lane generator. One can either finish here,
217-- and call the generator later (maybe multiple times, with different parameters)
218-- or add on actual thread arguments to also ignite the thread on the same call.
219--
220local lane_proxy
221
222local valid_libs= {
223 ["package"]= true,
224 ["table"]= true,
225 ["io"]= true,
226 ["os"]= true,
227 ["string"]= true,
228 ["math"]= true,
229 ["debug"]= true,
230 --
231 ["base"]= true,
232 ["coroutine"]= true,
233 ["*"]= true
234}
235
236function gen( ... )
237 local opt= {}
238 local libs= nil
239 local lev= 2 -- level for errors
240
241 local n= select('#',...)
242
243 if n==0 then
244 error( "No parameters!" )
245 end
246
247 for i=1,n-1 do
248 local v= select(i,...)
249 if type(v)=="string" then
250 libs= libs and libs..","..v or v
251 elseif type(v)=="table" then
252 for k,vv in pairs(v) do
253 opt[k]= vv
254 end
255 elseif v==nil then
256 -- skip
257 else
258 error( "Bad parameter: "..tostring(v) )
259 end
260 end
261
262 local func= select(n,...)
263 if type(func)~="function" then
264 error( "Last parameter not function: "..tostring(func) )
265 end
266
267 -- Check 'libs' already here, so the error goes in the right place
268 -- (otherwise will be noticed only once the generator is called)
269 --
270 if libs then
271 for s in string_gmatch(libs, "[%a*]+") do
272 if not valid_libs[s] then
273 error( "Bad library name: "..s )
274 end
275 end
276 end
277
278 local prio, cs, g_tbl
279
280 for k,v in pairs(opt) do
281 if k=="priority" then prio= v
282 elseif k=="cancelstep" then cs= (v==true) and 100 or
283 (v==false) and 0 or
284 type(v)=="number" and v or
285 error( "Bad cancelstep: "..tostring(v), lev )
286 elseif k=="globals" then g_tbl= v
287 --..
288 elseif k==1 then error( "unkeyed option: ".. tostring(v), lev )
289 else error( "Bad option: ".. tostring(k), lev )
290 end
291 end
292
293 -- Lane generator
294 --
295 return function(...)
296 return lane_proxy( thread_new( func, libs, cs, prio, g_tbl,
297 ... ) ) -- args
298 end
299end
300
301lane_proxy= function( ud )
302 local proxy= {
303 _ud= ud,
304
305 -- void= me:cancel()
306 --
307 cancel= function(me) thread_cancel(me._ud) end,
308
309 -- [...] | [nil,err,stack_tbl]= me:join( [wait_secs=-1] )
310 --
311 join= function( me, wait )
312 return thread_join( me._ud, wait )
313 end,
314 }
315 assert( proxy._ud )
316 setmetatable( proxy, lane_mt )
317
318 return proxy
319end
320
321
322---=== Lindas ===---
323
324-- We let the C code attach methods to userdata directly
325
326-----
327-- linda_ud= lanes.linda()
328--
329function linda()
330 local proxy= _deep_userdata( linda_id )
331 assert( (type(proxy) == "userdata") and getmetatable(proxy) )
332 return proxy
333end
334
335
336---=== Timers ===---
337
338--
339-- On first 'require "lanes"', a timer lane is spawned that will maintain
340-- timer tables and sleep in between the timer events. All interaction with
341-- the timer lane happens via a 'timer_gateway' Linda, which is common to
342-- all that 'require "lanes"'.
343--
344-- Linda protocol to timer lane:
345--
346-- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
347--
348local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
349local first_time_key= "first time"
350
351local first_time= timer_gateway:get(first_time_key) == nil
352timer_gateway:set(first_time_key,true)
353
354--
355-- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
356-- has 'table' always declared)
357--
358if first_time then
359 local table_remove= assert( table.remove )
360 local table_insert= assert( table.insert )
361
362 --
363 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
364 -- [key]= { wakeup_secs [,period_secs] } [, ...] },
365 -- }
366 --
367 -- Collection of all running timers, indexed with linda's & key.
368 --
369 -- Note that we need to use the deep lightuserdata identifiers, instead
370 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
371 -- entries for the same timer.
372 --
373 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
374 -- also important to keep the Linda alive, even if all outside world threw
375 -- away pointers to it (which would ruin uniqueness of the deep pointer).
376 -- Now we're safe.
377 --
378 local collection= {}
379
380 --
381 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
382 --
383 local function set_timer( linda, key, wakeup_at, period )
384
385 assert( wakeup_at==nil or wakeup_at>0.0 )
386 assert( period==nil or period>0.0 )
387
388 local linda_deep= linda:deep()
389 assert( linda_deep )
390
391 -- Find or make a lookup for this timer
392 --
393 local t1= collection[linda_deep]
394 if not t1 then
395 t1= { [linda_deep]= linda } -- proxy to use the Linda
396 collection[linda_deep]= t1
397 end
398
399 if wakeup_at==nil then
400 -- Clear the timer
401 --
402 t1[key]= nil
403
404 -- Remove empty tables from collection; speeds timer checks and
405 -- lets our 'safety reference' proxy be gc:ed as well.
406 --
407 local empty= true
408 for k,_ in pairs(t1) do
409 if k~= linda_deep then
410 empty= false; break
411 end
412 end
413 if empty then
414 collection[linda_deep]= nil
415 end
416
417 -- Note: any unread timer value is left at 'linda[key]' intensionally;
418 -- clearing a timer just stops it.
419 else
420 -- New timer or changing the timings
421 --
422 local t2= t1[key]
423 if not t2 then
424 t2= {}; t1[key]= t2
425 end
426
427 t2[1]= wakeup_at
428 t2[2]= period -- can be 'nil'
429 end
430 end
431
432 -----
433 -- [next_wakeup_at]= check_timers()
434 --
435 -- Check timers, and wake up the ones expired (if any)
436 --
437 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
438 --
439 local function check_timers()
440
441 local now= now_secs()
442 local next_wakeup
443
444 for linda_deep,t1 in pairs(collection) do
445 for key,t2 in pairs(t1) do
446 --
447 if key==linda_deep then
448 -- no 'continue' in Lua :/
449 else
450 -- 't2': { wakeup_at_secs [,period_secs] }
451 --
452 local wakeup_at= t2[1]
453 local period= t2[2] -- may be 'nil'
454
455 if wakeup_at <= now then
456 local linda= t1[linda_deep]
457 assert(linda)
458
459 linda:set( key, now )
460
461 -- 'pairs()' allows the values to be modified (and even
462 -- removed) as far as keys are not touched
463
464 if not period then
465 -- one-time timer; gone
466 --
467 t1[key]= nil
468 wakeup_at= nil -- no 'continue' in Lua :/
469 else
470 -- repeating timer; find next wakeup (may jump multiple repeats)
471 --
472 repeat
473 wakeup_at= wakeup_at+period
474 until wakeup_at > now
475
476 t2[1]= wakeup_at
477 end
478 end
479
480 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
481 next_wakeup= wakeup_at
482 end
483 end
484 end -- t2 loop
485 end -- t1 loop
486
487 return next_wakeup -- may be 'nil'
488 end
489
490 -----
491 -- Snore loop (run as a lane on the background)
492 --
493 -- High priority, to get trustworthy timings.
494 --
495 -- We let the timer lane be a "free running" thread; no handle to it
496 -- remains.
497 --
498 gen( "io", { priority=max_prio }, function()
499
500 while true do
501 local next_wakeup= check_timers()
502
503 -- Sleep until next timer to wake up, or a set/clear command
504 --
505 local secs= next_wakeup and (next_wakeup - now_secs()) or nil
506 local linda= timer_gateway:receive( secs, TGW_KEY )
507
508 if linda then
509 local key= timer_gateway:receive( 0.0, TGW_KEY )
510 local wakeup_at= timer_gateway:receive( 0.0, TGW_KEY )
511 local period= timer_gateway:receive( 0.0, TGW_KEY )
512 assert( key and wakeup_at and period )
513
514 set_timer( linda, key, wakeup_at, period>0 and period or nil )
515 end
516 end
517 end )()
518end
519
520-----
521-- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
522--
523function timer( linda, key, a, period )
524
525 if a==0.0 then
526 -- Caller expects to get current time stamp in Linda, on return
527 -- (like the timer had expired instantly); it would be good to set this
528 -- as late as possible (to give most current time) but also we want it
529 -- to precede any possible timers that might start striking.
530 --
531 linda:set( key, now_secs() )
532
533 if not period or period==0.0 then
534 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
535 return -- nothing more to do
536 end
537 a= period
538 end
539
540 local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
541 or now_secs()+a
542 -- queue to timer
543 --
544 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
545end
546
547
548---=== Lock & atomic generators ===---
549
550-- These functions are just surface sugar, but make solutions easier to read.
551-- Not many applications should even need explicit locks or atomic counters.
552
553--
554-- lock_f= lanes.genlock( linda_h, key [,N_uint=1] )
555--
556-- = lock_f( +M ) -- acquire M
557-- ...locked...
558-- = lock_f( -M ) -- release M
559--
560-- Returns an access function that allows 'N' simultaneous entries between
561-- acquire (+M) and release (-M). For binary locks, use M==1.
562--
563function genlock( linda, key, N )
564 linda:limit(key,N)
565 linda:set(key,nil) -- clears existing data
566
567 --
568 -- [true [, ...]= trues(uint)
569 --
570 local function trues(n)
571 if n>0 then return true,trues(n-1) end
572 end
573
574 return
575 function(M)
576 if M>0 then
577 -- 'nil' timeout allows 'key' to be numeric
578 linda:send( nil, key, trues(M) ) -- suspends until been able to push them
579 else
580 for i=1,-M do
581 linda:receive( key )
582 end
583 end
584 end
585end
586
587
588--
589-- atomic_f= lanes.genatomic( linda_h, key [,initial_num=0.0] )
590--
591-- int= atomic_f( [diff_num=1.0] )
592--
593-- Returns an access function that allows atomic increment/decrement of the
594-- number in 'key'.
595--
596function genatomic( linda, key, initial_val )
597 linda:limit(key,2) -- value [,true]
598 linda:set(key,initial_val or 0.0) -- clears existing data (also queue)
599
600 return
601 function(diff)
602 -- 'nil' allows 'key' to be numeric
603 linda:send( nil, key, true ) -- suspends until our 'true' is in
604 local val= linda:get(key) + (diff or 1.0)
605 linda:set( key, val ) -- releases the lock, by emptying queue
606 return val
607 end
608end
609
610
611--the end