aboutsummaryrefslogtreecommitdiff
path: root/etc/dispatch.lua
diff options
context:
space:
mode:
authorDiego Nehab <diego@tecgraf.puc-rio.br>2005-08-19 01:35:26 +0000
committerDiego Nehab <diego@tecgraf.puc-rio.br>2005-08-19 01:35:26 +0000
commit5e8ae76248ed31496dc6fef7855498a0479159ed (patch)
treeb72e99154f4901b503dbbe883445bee6c42ef70e /etc/dispatch.lua
parent0c3cdd5ef2485a79d6fec9261f2850c41577d5b3 (diff)
downloadluasocket-5e8ae76248ed31496dc6fef7855498a0479159ed.tar.gz
luasocket-5e8ae76248ed31496dc6fef7855498a0479159ed.tar.bz2
luasocket-5e8ae76248ed31496dc6fef7855498a0479159ed.zip
Dispatcher working for check-links. Need to get it working with forwarder.
Diffstat (limited to 'etc/dispatch.lua')
-rw-r--r--etc/dispatch.lua267
1 files changed, 267 insertions, 0 deletions
diff --git a/etc/dispatch.lua b/etc/dispatch.lua
new file mode 100644
index 0000000..e6c14a6
--- /dev/null
+++ b/etc/dispatch.lua
@@ -0,0 +1,267 @@
1-----------------------------------------------------------------------------
2-- A hacked dispatcher module
3-- LuaSocket sample files
4-- Author: Diego Nehab
5-- RCS ID: $$
6-----------------------------------------------------------------------------
7local base = _G
8local socket = require("socket")
9local coroutine = require("coroutine")
10module("dispatch")
11
12-- if too much time goes by without any activity in one of our sockets, we
13-- just kill it
14TIMEOUT = 10
15
16-----------------------------------------------------------------------------
17-- Mega hack. Don't try to do this at home.
18-----------------------------------------------------------------------------
19-- Lua 5.1 has coroutine.running(). We need it here, so we use this terrible
20-- hack to emulate it in Lua itself
21-- This is very inefficient, but is very good for debugging.
22local running
23local resume = coroutine.resume
24function coroutine.resume(co, ...)
25 running = co
26 return resume(co, unpack(arg))
27end
28
29function coroutine.running()
30 return running
31end
32
33-----------------------------------------------------------------------------
34-- Mega hack. Don't try to do this at home.
35-----------------------------------------------------------------------------
36-- we can't yield across calls to protect, so we rewrite it with coxpcall
37-- make sure you don't require any module that uses socket.protect before
38-- loading our hack
39function socket.protect(f)
40 return f
41end
42
43function socket.protect(f)
44 return function(...)
45 local co = coroutine.create(f)
46 while true do
47 local results = {resume(co, unpack(arg))}
48 local status = table.remove(results, 1)
49 if not status then
50 if type(results[1]) == 'table' then
51 return nil, results[1][1]
52 else error(results[1]) end
53 end
54 if coroutine.status(co) == "suspended" then
55 arg = {coroutine.yield(unpack(results))}
56 else
57 return unpack(results)
58 end
59 end
60 end
61end
62
63-----------------------------------------------------------------------------
64-- socket.tcp() replacement for non-blocking I/O
65-----------------------------------------------------------------------------
66local function newtrap(dispatcher)
67 -- try to create underlying socket
68 local tcp, error = socket.tcp()
69 if not tcp then return nil, error end
70 -- put it in non-blocking mode right away
71 tcp:settimeout(0)
72 -- metatable for trap produces new methods on demand for those that we
73 -- don't override explicitly.
74 local metat = { __index = function(table, key)
75 table[key] = function(...)
76 return tcp[key](tcp, unpack(arg))
77 end
78 end}
79 -- does user want to do his own non-blocking I/O?
80 local zero = false
81 -- create a trap object that will behave just like a real socket object
82 local trap = { }
83 -- we ignore settimeout to preserve our 0 timeout, but record whether
84 -- the user wants to do his own non-blocking I/O
85 function trap:settimeout(mode, value)
86 if value == 0 then
87 zero = true
88 else
89 zero = false
90 end
91 return 1
92 end
93 -- send in non-blocking mode and yield on timeout
94 function trap:send(data, first, last)
95 first = (first or 1) - 1
96 local result, error
97 while true do
98 -- tell dispatcher we want to keep sending before we yield
99 dispatcher.sending:insert(tcp)
100 -- mark time we started waiting
101 dispatcher.context[tcp].last = socket.gettime()
102 -- return control to dispatcher
103 -- if upon return the dispatcher tells us we timed out,
104 -- return an error to whoever called us
105 if coroutine.yield() == "timeout" then
106 return nil, "timeout"
107 end
108 -- try sending
109 result, error, first = tcp:send(data, first+1, last)
110 -- if we are done, or there was an unexpected error,
111 -- break away from loop
112 if error ~= "timeout" then return result, error, first end
113 end
114 end
115 -- receive in non-blocking mode and yield on timeout
116 -- or simply return partial read, if user requested timeout = 0
117 function trap:receive(pattern, partial)
118 local error = "timeout"
119 local value
120 while true do
121 -- tell dispatcher we want to keep receiving before we yield
122 dispatcher.receiving:insert(tcp)
123 -- mark time we started waiting
124 dispatcher.context[tcp].last = socket.gettime()
125 -- return control to dispatcher
126 -- if upon return the dispatcher tells us we timed out,
127 -- return an error to whoever called us
128 if coroutine.yield() == "timeout" then
129 return nil, "timeout"
130 end
131 -- try receiving
132 value, error, partial = tcp:receive(pattern, partial)
133 -- if we are done, or there was an unexpected error,
134 -- break away from loop
135 if (error ~= "timeout") or zero then
136 return value, error, partial
137 end
138 end
139 end
140 -- connect in non-blocking mode and yield on timeout
141 function trap:connect(host, port)
142 local result, error = tcp:connect(host, port)
143 -- mark time we started waiting
144 dispatcher.context[tcp].last = socket.gettime()
145 if error == "timeout" then
146 -- tell dispatcher we will be able to write uppon connection
147 dispatcher.sending:insert(tcp)
148 -- return control to dispatcher
149 -- if upon return the dispatcher tells us we have a
150 -- timeout, just abort
151 if coroutine.yield() == "timeout" then
152 return nil, "timeout"
153 end
154 -- when we come back, check if connection was successful
155 result, error = tcp:connect(host, port)
156 if result or error == "already connected" then return 1
157 else return nil, "non-blocking connect failed" end
158 else return result, error end
159 end
160 -- accept in non-blocking mode and yield on timeout
161 function trap:accept()
162 local result, error = tcp:accept()
163 while error == "timeout" do
164 -- mark time we started waiting
165 dispatcher.context[tcp].last = socket.gettime()
166 -- tell dispatcher we will be able to read uppon connection
167 dispatcher.receiving:insert(tcp)
168 -- return control to dispatcher
169 -- if upon return the dispatcher tells us we have a
170 -- timeout, just abort
171 if coroutine.yield() == "timeout" then
172 return nil, "timeout"
173 end
174 end
175 return result, error
176 end
177 -- remove thread from context
178 function trap:close()
179 dispatcher.context[tcp] = nil
180 return tcp:close()
181 end
182 -- add newly created socket to context
183 dispatcher.context[tcp] = {
184 thread = coroutine.running()
185 }
186 return setmetatable(trap, metat)
187end
188
189-----------------------------------------------------------------------------
190-- Our set data structure
191-----------------------------------------------------------------------------
192local function newset()
193 local reverse = {}
194 local set = {}
195 return setmetatable(set, {__index = {
196 insert = function(set, value)
197 if not reverse[value] then
198 table.insert(set, value)
199 reverse[value] = table.getn(set)
200 end
201 end,
202 remove = function(set, value)
203 local index = reverse[value]
204 if index then
205 reverse[value] = nil
206 local top = table.remove(set)
207 if top ~= value then
208 reverse[top] = index
209 set[index] = top
210 end
211 end
212 end
213 }})
214end
215
216-----------------------------------------------------------------------------
217-- Our dispatcher API.
218-----------------------------------------------------------------------------
219local metat = { __index = {} }
220
221function metat.__index:start(func)
222 local co = coroutine.create(func)
223 assert(coroutine.resume(co))
224end
225
226function newhandler()
227 local dispatcher = {
228 context = {},
229 sending = newset(),
230 receiving = newset()
231 }
232 function dispatcher.tcp()
233 return newtrap(dispatcher)
234 end
235 return setmetatable(dispatcher, metat)
236end
237
238-- step through all active threads
239function metat.__index:step()
240 -- check which sockets are interesting and act on them
241 local readable, writable = socket.select(self.receiving,
242 self.sending, 1)
243 -- for all readable connections, resume their threads
244 for _, who in ipairs(readable) do
245 if self.context[who] then
246 self.receiving:remove(who)
247 assert(coroutine.resume(self.context[who].thread))
248 end
249 end
250 -- for all writable connections, do the same
251 for _, who in ipairs(writable) do
252 if self.context[who] then
253 self.sending:remove(who)
254 assert(coroutine.resume(self.context[who].thread))
255 end
256 end
257 -- politely ask replacement I/O functions in idle threads to
258 -- return reporting a timeout
259 local now = socket.gettime()
260 for who, data in pairs(self.context) do
261 if data.last and now - data.last > TIMEOUT then
262 self.sending:remove(who)
263 self.receiving:remove(who)
264 assert(coroutine.resume(self.context[who].thread, "timeout"))
265 end
266 end
267end