aboutsummaryrefslogtreecommitdiff
path: root/etc
diff options
context:
space:
mode:
authorDiego Nehab <diego@tecgraf.puc-rio.br>2005-08-23 05:53:14 +0000
committerDiego Nehab <diego@tecgraf.puc-rio.br>2005-08-23 05:53:14 +0000
commit773e35ced30fa2c03ddb2a332bf8a9aebb56aa44 (patch)
tree03e8e9a4bd64b9424006315c8e720c3f2841a751 /etc
parent5e8ae76248ed31496dc6fef7855498a0479159ed (diff)
downloadluasocket-773e35ced30fa2c03ddb2a332bf8a9aebb56aa44.tar.gz
luasocket-773e35ced30fa2c03ddb2a332bf8a9aebb56aa44.tar.bz2
luasocket-773e35ced30fa2c03ddb2a332bf8a9aebb56aa44.zip
Compiled on Windows. Fixed a bunch of stuff. Almost ready to release.
Implemented a nice dispatcher! Non-blocking check-links and forward server use the dispatcher.
Diffstat (limited to 'etc')
-rw-r--r--etc/check-links.lua31
-rw-r--r--etc/dispatch.lua296
-rw-r--r--etc/forward.lua65
3 files changed, 242 insertions, 150 deletions
diff --git a/etc/check-links.lua b/etc/check-links.lua
index e06cc91..725cd2a 100644
--- a/etc/check-links.lua
+++ b/etc/check-links.lua
@@ -5,33 +5,26 @@
5-- Author: Diego Nehab 5-- Author: Diego Nehab
6-- RCS ID: $$ 6-- RCS ID: $$
7----------------------------------------------------------------------------- 7-----------------------------------------------------------------------------
8local dispatch, url, http, handler 8local url = require("socket.url")
9local dispatch = require("dispatch")
10local http = require("socket.http")
11dispatch.TIMEOUT = 10
9 12
13-- make sure the user knows how to invoke us
10arg = arg or {} 14arg = arg or {}
11if table.getn(arg) < 1 then 15if table.getn(arg) < 1 then
12 print("Usage:\n luasocket check-links.lua [-n] {<url>}") 16 print("Usage:\n luasocket check-links.lua [-n] {<url>}")
13 exit() 17 exit()
14end 18end
15 19
16if arg[1] ~= "-n" then 20-- '-n' means we are running in non-blocking mode
17 -- if using blocking I/O, simulate dispatcher interface 21if arg[1] == "-n" then
18 url = require("socket.url") 22 -- if non-blocking I/O was requested, use real dispatcher interface
19 http = require("socket.http")
20 handler = {
21 start = function(self, f)
22 f()
23 end,
24 tcp = socket.tcp
25 }
26 http.TIMEOUT = 10
27else
28 -- if non-blocking I/O was requested, disable dispatcher
29 table.remove(arg, 1) 23 table.remove(arg, 1)
30 dispatch = require("dispatch") 24 handler = dispatch.newhandler("coroutine")
31 dispatch.TIMEOUT = 10 25else
32 url = require("socket.url") 26 -- if using blocking I/O, use fake dispatcher interface
33 http = require("socket.http") 27 handler = dispatch.newhandler("sequential")
34 handler = dispatch.newhandler()
35end 28end
36 29
37local nthreads = 0 30local nthreads = 0
diff --git a/etc/dispatch.lua b/etc/dispatch.lua
index e6c14a6..98fa8a8 100644
--- a/etc/dispatch.lua
+++ b/etc/dispatch.lua
@@ -11,23 +11,33 @@ module("dispatch")
11 11
12-- if too much time goes by without any activity in one of our sockets, we 12-- if too much time goes by without any activity in one of our sockets, we
13-- just kill it 13-- just kill it
14TIMEOUT = 10 14TIMEOUT = 60
15 15
16----------------------------------------------------------------------------- 16-----------------------------------------------------------------------------
17-- Mega hack. Don't try to do this at home. 17-- We implement 3 types of dispatchers:
18-- sequential
19-- coroutine
20-- threaded
21-- The user can choose whatever one is needed
18----------------------------------------------------------------------------- 22-----------------------------------------------------------------------------
19-- Lua 5.1 has coroutine.running(). We need it here, so we use this terrible 23local handlert = {}
20-- hack to emulate it in Lua itself 24
21-- This is very inefficient, but is very good for debugging. 25-- default handler is coroutine
22local running 26function newhandler(mode)
23local resume = coroutine.resume 27 mode = mode or "coroutine"
24function coroutine.resume(co, ...) 28 return handlert[mode]()
25 running = co
26 return resume(co, unpack(arg))
27end 29end
28 30
29function coroutine.running() 31local function seqstart(self, func)
30 return running 32 return func()
33end
34
35-- sequential handler simply calls the functions and doesn't wrap I/O
36function handlert.sequential()
37 return {
38 tcp = socket.tcp,
39 start = seqstart
40 }
31end 41end
32 42
33----------------------------------------------------------------------------- 43-----------------------------------------------------------------------------
@@ -37,14 +47,10 @@ end
37-- make sure you don't require any module that uses socket.protect before 47-- make sure you don't require any module that uses socket.protect before
38-- loading our hack 48-- loading our hack
39function socket.protect(f) 49function socket.protect(f)
40 return f
41end
42
43function socket.protect(f)
44 return function(...) 50 return function(...)
45 local co = coroutine.create(f) 51 local co = coroutine.create(f)
46 while true do 52 while true do
47 local results = {resume(co, unpack(arg))} 53 local results = {coroutine.resume(co, unpack(arg))}
48 local status = table.remove(results, 1) 54 local status = table.remove(results, 1)
49 if not status then 55 if not status then
50 if type(results[1]) == 'table' then 56 if type(results[1]) == 'table' then
@@ -61,48 +67,68 @@ function socket.protect(f)
61end 67end
62 68
63----------------------------------------------------------------------------- 69-----------------------------------------------------------------------------
64-- socket.tcp() replacement for non-blocking I/O 70-- Simple set data structure. O(1) everything.
71-----------------------------------------------------------------------------
72local function newset()
73 local reverse = {}
74 local set = {}
75 return setmetatable(set, {__index = {
76 insert = function(set, value)
77 if not reverse[value] then
78 table.insert(set, value)
79 reverse[value] = table.getn(set)
80 end
81 end,
82 remove = function(set, value)
83 local index = reverse[value]
84 if index then
85 reverse[value] = nil
86 local top = table.remove(set)
87 if top ~= value then
88 reverse[top] = index
89 set[index] = top
90 end
91 end
92 end
93 }})
94end
95
96-----------------------------------------------------------------------------
97-- socket.tcp() wrapper for the coroutine dispatcher
65----------------------------------------------------------------------------- 98-----------------------------------------------------------------------------
66local function newtrap(dispatcher) 99local function cowrap(dispatcher, tcp, error)
67 -- try to create underlying socket
68 local tcp, error = socket.tcp()
69 if not tcp then return nil, error end 100 if not tcp then return nil, error end
70 -- put it in non-blocking mode right away 101 -- put it in non-blocking mode right away
71 tcp:settimeout(0) 102 tcp:settimeout(0)
72 -- metatable for trap produces new methods on demand for those that we 103 -- metatable for wrap produces new methods on demand for those that we
73 -- don't override explicitly. 104 -- don't override explicitly.
74 local metat = { __index = function(table, key) 105 local metat = { __index = function(table, key)
75 table[key] = function(...) 106 table[key] = function(...)
76 return tcp[key](tcp, unpack(arg)) 107 arg[1] = tcp
108 return tcp[key](unpack(arg))
77 end 109 end
110 return table[key]
78 end} 111 end}
79 -- does user want to do his own non-blocking I/O? 112 -- does our user want to do his own non-blocking I/O?
80 local zero = false 113 local zero = false
81 -- create a trap object that will behave just like a real socket object 114 -- create a wrap object that will behave just like a real socket object
82 local trap = { } 115 local wrap = { }
83 -- we ignore settimeout to preserve our 0 timeout, but record whether 116 -- we ignore settimeout to preserve our 0 timeout, but record whether
84 -- the user wants to do his own non-blocking I/O 117 -- the user wants to do his own non-blocking I/O
85 function trap:settimeout(mode, value) 118 function wrap:settimeout(value, mode)
86 if value == 0 then 119 if value == 0 then zero = true
87 zero = true 120 else zero = false end
88 else
89 zero = false
90 end
91 return 1 121 return 1
92 end 122 end
93 -- send in non-blocking mode and yield on timeout 123 -- send in non-blocking mode and yield on timeout
94 function trap:send(data, first, last) 124 function wrap:send(data, first, last)
95 first = (first or 1) - 1 125 first = (first or 1) - 1
96 local result, error 126 local result, error
97 while true do 127 while true do
98 -- tell dispatcher we want to keep sending before we yield 128 -- return control to dispatcher and tell it we want to send
99 dispatcher.sending:insert(tcp)
100 -- mark time we started waiting
101 dispatcher.context[tcp].last = socket.gettime()
102 -- return control to dispatcher
103 -- if upon return the dispatcher tells us we timed out, 129 -- if upon return the dispatcher tells us we timed out,
104 -- return an error to whoever called us 130 -- return an error to whoever called us
105 if coroutine.yield() == "timeout" then 131 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
106 return nil, "timeout" 132 return nil, "timeout"
107 end 133 end
108 -- try sending 134 -- try sending
@@ -114,41 +140,35 @@ local function newtrap(dispatcher)
114 end 140 end
115 -- receive in non-blocking mode and yield on timeout 141 -- receive in non-blocking mode and yield on timeout
116 -- or simply return partial read, if user requested timeout = 0 142 -- or simply return partial read, if user requested timeout = 0
117 function trap:receive(pattern, partial) 143 function wrap:receive(pattern, partial)
118 local error = "timeout" 144 local error = "timeout"
119 local value 145 local value
120 while true do 146 while true do
121 -- tell dispatcher we want to keep receiving before we yield 147 -- return control to dispatcher and tell it we want to receive
122 dispatcher.receiving:insert(tcp)
123 -- mark time we started waiting
124 dispatcher.context[tcp].last = socket.gettime()
125 -- return control to dispatcher
126 -- if upon return the dispatcher tells us we timed out, 148 -- if upon return the dispatcher tells us we timed out,
127 -- return an error to whoever called us 149 -- return an error to whoever called us
128 if coroutine.yield() == "timeout" then 150 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
129 return nil, "timeout" 151 return nil, "timeout"
130 end 152 end
131 -- try receiving 153 -- try receiving
132 value, error, partial = tcp:receive(pattern, partial) 154 value, error, partial = tcp:receive(pattern, partial)
133 -- if we are done, or there was an unexpected error, 155 -- if we are done, or there was an unexpected error,
134 -- break away from loop 156 -- break away from loop. also, if the user requested
157 -- zero timeout, return all we got
135 if (error ~= "timeout") or zero then 158 if (error ~= "timeout") or zero then
136 return value, error, partial 159 return value, error, partial
137 end 160 end
138 end 161 end
139 end 162 end
140 -- connect in non-blocking mode and yield on timeout 163 -- connect in non-blocking mode and yield on timeout
141 function trap:connect(host, port) 164 function wrap:connect(host, port)
142 local result, error = tcp:connect(host, port) 165 local result, error = tcp:connect(host, port)
143 -- mark time we started waiting
144 dispatcher.context[tcp].last = socket.gettime()
145 if error == "timeout" then 166 if error == "timeout" then
146 -- tell dispatcher we will be able to write uppon connection 167 -- return control to dispatcher. we will be writable when
147 dispatcher.sending:insert(tcp) 168 -- connection succeeds.
148 -- return control to dispatcher
149 -- if upon return the dispatcher tells us we have a 169 -- if upon return the dispatcher tells us we have a
150 -- timeout, just abort 170 -- timeout, just abort
151 if coroutine.yield() == "timeout" then 171 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
152 return nil, "timeout" 172 return nil, "timeout"
153 end 173 end
154 -- when we come back, check if connection was successful 174 -- when we come back, check if connection was successful
@@ -158,110 +178,124 @@ local function newtrap(dispatcher)
158 else return result, error end 178 else return result, error end
159 end 179 end
160 -- accept in non-blocking mode and yield on timeout 180 -- accept in non-blocking mode and yield on timeout
161 function trap:accept() 181 function wrap:accept()
162 local result, error = tcp:accept() 182 while 1 do
163 while error == "timeout" do 183 -- return control to dispatcher. we will be readable when a
164 -- mark time we started waiting 184 -- connection arrives.
165 dispatcher.context[tcp].last = socket.gettime()
166 -- tell dispatcher we will be able to read uppon connection
167 dispatcher.receiving:insert(tcp)
168 -- return control to dispatcher
169 -- if upon return the dispatcher tells us we have a 185 -- if upon return the dispatcher tells us we have a
170 -- timeout, just abort 186 -- timeout, just abort
171 if coroutine.yield() == "timeout" then 187 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
172 return nil, "timeout" 188 return nil, "timeout"
173 end 189 end
190 local client, error = tcp:accept()
191 if error ~= "timeout" then
192 return cowrap(dispatcher, client, error)
193 end
174 end 194 end
175 return result, error
176 end 195 end
177 -- remove thread from context 196 -- remove cortn from context
178 function trap:close() 197 function wrap:close()
179 dispatcher.context[tcp] = nil 198 dispatcher.stamp[tcp] = nil
199 dispatcher.sending.set:remove(tcp)
200 dispatcher.sending.cortn[tcp] = nil
201 dispatcher.receiving.set:remove(tcp)
202 dispatcher.receiving.cortn[tcp] = nil
180 return tcp:close() 203 return tcp:close()
181 end 204 end
182 -- add newly created socket to context 205 return setmetatable(wrap, metat)
183 dispatcher.context[tcp] = {
184 thread = coroutine.running()
185 }
186 return setmetatable(trap, metat)
187end 206end
188 207
208
189----------------------------------------------------------------------------- 209-----------------------------------------------------------------------------
190-- Our set data structure 210-- Our coroutine dispatcher
191----------------------------------------------------------------------------- 211-----------------------------------------------------------------------------
192local function newset() 212local cometat = { __index = {} }
193 local reverse = {} 213
194 local set = {} 214function schedule(cortn, status, operation, tcp)
195 return setmetatable(set, {__index = { 215 if status then
196 insert = function(set, value) 216 if cortn and operation then
197 if not reverse[value] then 217 operation.set:insert(tcp)
198 table.insert(set, value) 218 operation.cortn[tcp] = cortn
199 reverse[value] = table.getn(set) 219 operation.stamp[tcp] = socket.gettime()
200 end
201 end,
202 remove = function(set, value)
203 local index = reverse[value]
204 if index then
205 reverse[value] = nil
206 local top = table.remove(set)
207 if top ~= value then
208 reverse[top] = index
209 set[index] = top
210 end
211 end
212 end 220 end
213 }}) 221 else error(operation) end
214end 222end
215 223
216----------------------------------------------------------------------------- 224function kick(operation, tcp)
217-- Our dispatcher API. 225 operation.cortn[tcp] = nil
218----------------------------------------------------------------------------- 226 operation.set:remove(tcp)
219local metat = { __index = {} } 227end
220 228
221function metat.__index:start(func) 229function wakeup(operation, tcp)
222 local co = coroutine.create(func) 230 local cortn = operation.cortn[tcp]
223 assert(coroutine.resume(co)) 231 -- if cortn is still valid, wake it up
232 if cortn then
233 kick(operation, tcp)
234 return cortn, coroutine.resume(cortn)
235 -- othrewise, just get scheduler not to do anything
236 else
237 return nil, true
238 end
224end 239end
225 240
226function newhandler() 241function abort(operation, tcp)
227 local dispatcher = { 242 local cortn = operation.cortn[tcp]
228 context = {}, 243 if cortn then
229 sending = newset(), 244 kick(operation, tcp)
230 receiving = newset() 245 coroutine.resume(cortn, "timeout")
231 }
232 function dispatcher.tcp()
233 return newtrap(dispatcher)
234 end 246 end
235 return setmetatable(dispatcher, metat)
236end 247end
237 248
238-- step through all active threads 249-- step through all active cortns
239function metat.__index:step() 250function cometat.__index:step()
240 -- check which sockets are interesting and act on them 251 -- check which sockets are interesting and act on them
241 local readable, writable = socket.select(self.receiving, 252 local readable, writable = socket.select(self.receiving.set,
242 self.sending, 1) 253 self.sending.set, 1)
243 -- for all readable connections, resume their threads 254 -- for all readable connections, resume their cortns and reschedule
244 for _, who in ipairs(readable) do 255 -- when they yield back to us
245 if self.context[who] then 256 for _, tcp in ipairs(readable) do
246 self.receiving:remove(who) 257 schedule(wakeup(self.receiving, tcp))
247 assert(coroutine.resume(self.context[who].thread))
248 end
249 end 258 end
250 -- for all writable connections, do the same 259 -- for all writable connections, do the same
251 for _, who in ipairs(writable) do 260 for _, tcp in ipairs(writable) do
252 if self.context[who] then 261 schedule(wakeup(self.sending, tcp))
253 self.sending:remove(who)
254 assert(coroutine.resume(self.context[who].thread))
255 end
256 end 262 end
257 -- politely ask replacement I/O functions in idle threads to 263 -- politely ask replacement I/O functions in idle cortns to
258 -- return reporting a timeout 264 -- return reporting a timeout
259 local now = socket.gettime() 265 local now = socket.gettime()
260 for who, data in pairs(self.context) do 266 for tcp, stamp in pairs(self.stamp) do
261 if data.last and now - data.last > TIMEOUT then 267 if tcp.class == "tcp{client}" and now - stamp > TIMEOUT then
262 self.sending:remove(who) 268 abort(self.sending, tcp)
263 self.receiving:remove(who) 269 abort(self.receiving, tcp)
264 assert(coroutine.resume(self.context[who].thread, "timeout"))
265 end 270 end
266 end 271 end
267end 272end
273
274function cometat.__index:start(func)
275 local cortn = coroutine.create(func)
276 schedule(cortn, coroutine.resume(cortn))
277end
278
279function handlert.coroutine()
280 local stamp = {}
281 local dispatcher = {
282 stamp = stamp,
283 sending = {
284 name = "sending",
285 set = newset(),
286 cortn = {},
287 stamp = stamp
288 },
289 receiving = {
290 name = "receiving",
291 set = newset(),
292 cortn = {},
293 stamp = stamp
294 },
295 }
296 function dispatcher.tcp()
297 return cowrap(dispatcher, socket.tcp())
298 end
299 return setmetatable(dispatcher, cometat)
300end
301
diff --git a/etc/forward.lua b/etc/forward.lua
new file mode 100644
index 0000000..eac98ae
--- /dev/null
+++ b/etc/forward.lua
@@ -0,0 +1,65 @@
1-- load our favourite library
2local dispatch = require("dispatch")
3local handler = dispatch.newhandler()
4
5-- make sure the user knows how to invoke us
6if table.getn(arg) < 1 then
7 print("Usage")
8 print(" lua forward.lua <iport:ohost:oport> ...")
9 os.exit(1)
10end
11
12-- function to move data from one socket to the other
13local function move(foo, bar)
14 local live
15 while 1 do
16 local data, error, partial = foo:receive(2048)
17 live = data or error == "timeout"
18 data = data or partial
19 local result, error = bar:send(data)
20 if not live or not result then
21 foo:close()
22 bar:close()
23 break
24 end
25 end
26end
27
28-- for each tunnel, start a new server
29for i, v in ipairs(arg) do
30 -- capture forwarding parameters
31 local _, _, iport, ohost, oport = string.find(v, "([^:]+):([^:]+):([^:]+)")
32 assert(iport, "invalid arguments")
33 -- create our server socket
34 local server = assert(handler.tcp())
35 assert(server:setoption("reuseaddr", true))
36 assert(server:bind("*", iport))
37 assert(server:listen(32))
38 -- handler for the server object loops accepting new connections
39 handler:start(function()
40 while 1 do
41 local client = assert(server:accept())
42 assert(client:settimeout(0))
43 -- for each new connection, start a new client handler
44 handler:start(function()
45 -- handler tries to connect to peer
46 local peer = assert(handler.tcp())
47 assert(peer:settimeout(0))
48 assert(peer:connect(ohost, oport))
49 -- if sucessful, starts a new handler to send data from
50 -- client to peer
51 handler:start(function()
52 move(client, peer)
53 end)
54 -- afte starting new handler, enter in loop sending data from
55 -- peer to client
56 move(peer, client)
57 end)
58 end
59 end)
60end
61
62-- simply loop stepping the server
63while 1 do
64 handler:step()
65end