aboutsummaryrefslogtreecommitdiff
path: root/etc/dispatch.lua
diff options
context:
space:
mode:
Diffstat (limited to 'etc/dispatch.lua')
-rw-r--r--etc/dispatch.lua82
1 files changed, 41 insertions, 41 deletions
diff --git a/etc/dispatch.lua b/etc/dispatch.lua
index 98fa8a8..dd76d6d 100644
--- a/etc/dispatch.lua
+++ b/etc/dispatch.lua
@@ -14,7 +14,7 @@ module("dispatch")
14TIMEOUT = 60 14TIMEOUT = 60
15 15
16----------------------------------------------------------------------------- 16-----------------------------------------------------------------------------
17-- We implement 3 types of dispatchers: 17-- We implement 3 types of dispatchers:
18-- sequential 18-- sequential
19-- coroutine 19-- coroutine
20-- threaded 20-- threaded
@@ -34,7 +34,7 @@ end
34 34
35-- sequential handler simply calls the functions and doesn't wrap I/O 35-- sequential handler simply calls the functions and doesn't wrap I/O
36function handlert.sequential() 36function handlert.sequential()
37 return { 37 return {
38 tcp = socket.tcp, 38 tcp = socket.tcp,
39 start = seqstart 39 start = seqstart
40 } 40 }
@@ -55,7 +55,7 @@ function socket.protect(f)
55 if not status then 55 if not status then
56 if type(results[1]) == 'table' then 56 if type(results[1]) == 'table' then
57 return nil, results[1][1] 57 return nil, results[1][1]
58 else error(results[1]) end 58 else error(results[1]) end
59 end 59 end
60 if coroutine.status(co) == "suspended" then 60 if coroutine.status(co) == "suspended" then
61 arg = {coroutine.yield(unpack(results))} 61 arg = {coroutine.yield(unpack(results))}
@@ -84,17 +84,17 @@ local function newset()
84 if index then 84 if index then
85 reverse[value] = nil 85 reverse[value] = nil
86 local top = table.remove(set) 86 local top = table.remove(set)
87 if top ~= value then 87 if top ~= value then
88 reverse[top] = index 88 reverse[top] = index
89 set[index] = top 89 set[index] = top
90 end 90 end
91 end 91 end
92 end 92 end
93 }}) 93 }})
94end 94end
95 95
96----------------------------------------------------------------------------- 96-----------------------------------------------------------------------------
97-- socket.tcp() wrapper for the coroutine dispatcher 97-- socket.tcp() wrapper for the coroutine dispatcher
98----------------------------------------------------------------------------- 98-----------------------------------------------------------------------------
99local function cowrap(dispatcher, tcp, error) 99local function cowrap(dispatcher, tcp, error)
100 if not tcp then return nil, error end 100 if not tcp then return nil, error end
@@ -102,7 +102,7 @@ local function cowrap(dispatcher, tcp, error)
102 tcp:settimeout(0) 102 tcp:settimeout(0)
103 -- metatable for wrap produces new methods on demand for those that we 103 -- metatable for wrap produces new methods on demand for those that we
104 -- don't override explicitly. 104 -- don't override explicitly.
105 local metat = { __index = function(table, key) 105 local metat = { __index = function(table, key)
106 table[key] = function(...) 106 table[key] = function(...)
107 arg[1] = tcp 107 arg[1] = tcp
108 return tcp[key](unpack(arg)) 108 return tcp[key](unpack(arg))
@@ -112,7 +112,7 @@ local function cowrap(dispatcher, tcp, error)
112 -- does our user want to do his own non-blocking I/O? 112 -- does our user want to do his own non-blocking I/O?
113 local zero = false 113 local zero = false
114 -- create a wrap object that will behave just like a real socket object 114 -- create a wrap object that will behave just like a real socket object
115 local wrap = { } 115 local wrap = { }
116 -- we ignore settimeout to preserve our 0 timeout, but record whether 116 -- we ignore settimeout to preserve our 0 timeout, but record whether
117 -- the user wants to do his own non-blocking I/O 117 -- the user wants to do his own non-blocking I/O
118 function wrap:settimeout(value, mode) 118 function wrap:settimeout(value, mode)
@@ -121,19 +121,19 @@ local function cowrap(dispatcher, tcp, error)
121 return 1 121 return 1
122 end 122 end
123 -- send in non-blocking mode and yield on timeout 123 -- send in non-blocking mode and yield on timeout
124 function wrap:send(data, first, last) 124 function wrap:send(data, first, last)
125 first = (first or 1) - 1 125 first = (first or 1) - 1
126 local result, error 126 local result, error
127 while true do 127 while true do
128 -- return control to dispatcher and tell it we want to send 128 -- return control to dispatcher and tell it we want to send
129 -- if upon return the dispatcher tells us we timed out, 129 -- if upon return the dispatcher tells us we timed out,
130 -- return an error to whoever called us 130 -- return an error to whoever called us
131 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then 131 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
132 return nil, "timeout" 132 return nil, "timeout"
133 end 133 end
134 -- try sending 134 -- try sending
135 result, error, first = tcp:send(data, first+1, last) 135 result, error, first = tcp:send(data, first+1, last)
136 -- if we are done, or there was an unexpected error, 136 -- if we are done, or there was an unexpected error,
137 -- break away from loop 137 -- break away from loop
138 if error ~= "timeout" then return result, error, first end 138 if error ~= "timeout" then return result, error, first end
139 end 139 end
@@ -143,20 +143,20 @@ local function cowrap(dispatcher, tcp, error)
143 function wrap:receive(pattern, partial) 143 function wrap:receive(pattern, partial)
144 local error = "timeout" 144 local error = "timeout"
145 local value 145 local value
146 while true do 146 while true do
147 -- return control to dispatcher and tell it we want to receive 147 -- return control to dispatcher and tell it we want to receive
148 -- if upon return the dispatcher tells us we timed out, 148 -- if upon return the dispatcher tells us we timed out,
149 -- return an error to whoever called us 149 -- return an error to whoever called us
150 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then 150 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
151 return nil, "timeout" 151 return nil, "timeout"
152 end 152 end
153 -- try receiving 153 -- try receiving
154 value, error, partial = tcp:receive(pattern, partial) 154 value, error, partial = tcp:receive(pattern, partial)
155 -- if we are done, or there was an unexpected error, 155 -- if we are done, or there was an unexpected error,
156 -- break away from loop. also, if the user requested 156 -- break away from loop. also, if the user requested
157 -- zero timeout, return all we got 157 -- zero timeout, return all we got
158 if (error ~= "timeout") or zero then 158 if (error ~= "timeout") or zero then
159 return value, error, partial 159 return value, error, partial
160 end 160 end
161 end 161 end
162 end 162 end
@@ -168,8 +168,8 @@ local function cowrap(dispatcher, tcp, error)
168 -- connection succeeds. 168 -- connection succeeds.
169 -- if upon return the dispatcher tells us we have a 169 -- if upon return the dispatcher tells us we have a
170 -- timeout, just abort 170 -- timeout, just abort
171 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then 171 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
172 return nil, "timeout" 172 return nil, "timeout"
173 end 173 end
174 -- when we come back, check if connection was successful 174 -- when we come back, check if connection was successful
175 result, error = tcp:connect(host, port) 175 result, error = tcp:connect(host, port)
@@ -179,27 +179,27 @@ local function cowrap(dispatcher, tcp, error)
179 end 179 end
180 -- accept in non-blocking mode and yield on timeout 180 -- accept in non-blocking mode and yield on timeout
181 function wrap:accept() 181 function wrap:accept()
182 while 1 do 182 while 1 do
183 -- return control to dispatcher. we will be readable when a 183 -- return control to dispatcher. we will be readable when a
184 -- connection arrives. 184 -- connection arrives.
185 -- if upon return the dispatcher tells us we have a 185 -- if upon return the dispatcher tells us we have a
186 -- timeout, just abort 186 -- timeout, just abort
187 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then 187 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
188 return nil, "timeout" 188 return nil, "timeout"
189 end 189 end
190 local client, error = tcp:accept() 190 local client, error = tcp:accept()
191 if error ~= "timeout" then 191 if error ~= "timeout" then
192 return cowrap(dispatcher, client, error) 192 return cowrap(dispatcher, client, error)
193 end 193 end
194 end 194 end
195 end 195 end
196 -- remove cortn from context 196 -- remove cortn from context
197 function wrap:close() 197 function wrap:close()
198 dispatcher.stamp[tcp] = nil 198 dispatcher.stamp[tcp] = nil
199 dispatcher.sending.set:remove(tcp) 199 dispatcher.sending.set:remove(tcp)
200 dispatcher.sending.cortn[tcp] = nil 200 dispatcher.sending.cortn[tcp] = nil
201 dispatcher.receiving.set:remove(tcp) 201 dispatcher.receiving.set:remove(tcp)
202 dispatcher.receiving.cortn[tcp] = nil 202 dispatcher.receiving.cortn[tcp] = nil
203 return tcp:close() 203 return tcp:close()
204 end 204 end
205 return setmetatable(wrap, metat) 205 return setmetatable(wrap, metat)
@@ -207,12 +207,12 @@ end
207 207
208 208
209----------------------------------------------------------------------------- 209-----------------------------------------------------------------------------
210-- Our coroutine dispatcher 210-- Our coroutine dispatcher
211----------------------------------------------------------------------------- 211-----------------------------------------------------------------------------
212local cometat = { __index = {} } 212local cometat = { __index = {} }
213 213
214function schedule(cortn, status, operation, tcp) 214function schedule(cortn, status, operation, tcp)
215 if status then 215 if status then
216 if cortn and operation then 216 if cortn and operation then
217 operation.set:insert(tcp) 217 operation.set:insert(tcp)
218 operation.cortn[tcp] = cortn 218 operation.cortn[tcp] = cortn
@@ -233,7 +233,7 @@ function wakeup(operation, tcp)
233 kick(operation, tcp) 233 kick(operation, tcp)
234 return cortn, coroutine.resume(cortn) 234 return cortn, coroutine.resume(cortn)
235 -- othrewise, just get scheduler not to do anything 235 -- othrewise, just get scheduler not to do anything
236 else 236 else
237 return nil, true 237 return nil, true
238 end 238 end
239end 239end
@@ -249,7 +249,7 @@ end
249-- step through all active cortns 249-- step through all active cortns
250function cometat.__index:step() 250function cometat.__index:step()
251 -- check which sockets are interesting and act on them 251 -- check which sockets are interesting and act on them
252 local readable, writable = socket.select(self.receiving.set, 252 local readable, writable = socket.select(self.receiving.set,
253 self.sending.set, 1) 253 self.sending.set, 1)
254 -- for all readable connections, resume their cortns and reschedule 254 -- for all readable connections, resume their cortns and reschedule
255 -- when they yield back to us 255 -- when they yield back to us
@@ -260,7 +260,7 @@ function cometat.__index:step()
260 for _, tcp in ipairs(writable) do 260 for _, tcp in ipairs(writable) do
261 schedule(wakeup(self.sending, tcp)) 261 schedule(wakeup(self.sending, tcp))
262 end 262 end
263 -- politely ask replacement I/O functions in idle cortns to 263 -- politely ask replacement I/O functions in idle cortns to
264 -- return reporting a timeout 264 -- return reporting a timeout
265 local now = socket.gettime() 265 local now = socket.gettime()
266 for tcp, stamp in pairs(self.stamp) do 266 for tcp, stamp in pairs(self.stamp) do
@@ -271,25 +271,25 @@ function cometat.__index:step()
271 end 271 end
272end 272end
273 273
274function cometat.__index:start(func) 274function cometat.__index:start(func)
275 local cortn = coroutine.create(func) 275 local cortn = coroutine.create(func)
276 schedule(cortn, coroutine.resume(cortn)) 276 schedule(cortn, coroutine.resume(cortn))
277end 277end
278 278
279function handlert.coroutine() 279function handlert.coroutine()
280 local stamp = {} 280 local stamp = {}
281 local dispatcher = { 281 local dispatcher = {
282 stamp = stamp, 282 stamp = stamp,
283 sending = { 283 sending = {
284 name = "sending", 284 name = "sending",
285 set = newset(), 285 set = newset(),
286 cortn = {}, 286 cortn = {},
287 stamp = stamp 287 stamp = stamp
288 }, 288 },
289 receiving = { 289 receiving = {
290 name = "receiving", 290 name = "receiving",
291 set = newset(), 291 set = newset(),
292 cortn = {}, 292 cortn = {},
293 stamp = stamp 293 stamp = stamp
294 }, 294 },
295 } 295 }