aboutsummaryrefslogtreecommitdiff
path: root/gem/ltn012.tex
blob: 0f81b864413f0993bf2dcf8b46d7336582259405 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
\documentclass[10pt]{article}
\usepackage{fancyvrb}
\usepackage{url}
\DefineVerbatimEnvironment{lua}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
\DefineVerbatimEnvironment{C}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
\DefineVerbatimEnvironment{mime}{Verbatim}{fontsize=\small,commandchars=\$\#\%}
\newcommand{\stick}[1]{\vbox{\setlength{\parskip}{0pt}#1}}
\newcommand{\bl}{\ensuremath{\mathtt{\backslash}}}


\title{Filters, sources, sinks, and pumps\\
      {\large or Functional programming for the rest of us}}
\author{Diego Nehab}

\begin{document}

\maketitle

\begin{abstract}
Certain data processing operations can be implemented in the 
form of filters. A filter is a function that can process data 
received in consecutive function calls, returning partial
results after each invocation.  Examples of operations that can be
implemented as filters include the end-of-line normalization
for text, Base64 and Quoted-Printable transfer content
encodings, the breaking of text into lines, SMTP dot-stuffing, 
and there are many others.  Filters become even
more powerful when we allow them to be chained together to
create composite filters. In this context, filters can be seen 
as the middle links in a chain of data transformations. Sources an sinks
are the corresponding end points of these chains. A source
is a function that produces data, chunk by chunk, and a sink
is a function that takes data, chunk by chunk. In this
article, we describe the design of an elegant interface for filters,
sources, sinks, and chaining, and illustrate each step 
with concrete examples. 
\end{abstract}


\section{Introduction}

Within the realm of networking applications, we are often
required apply transformations to streams of data. Examples
include the end-of-line normalization for text, Base64 and
Quoted-Printable transfer content encodings, breaking text
into lines with a maximum number of columns, SMTP
dot-stuffing, \texttt{gzip} compression, HTTP chunked
transfer coding, and the list goes on.

Many complex tasks require a combination of two or more such
transformations, and therefore a general mechanism for
promoting reuse is desirable. In the process of designing
\texttt{LuaSocket~2.0}, David Burgess and I were forced to deal with
this problem. The solution we reached proved to be very
general and convenient. It is based on the concepts of
filters, sources, sinks, and pumps, which we introduce
below. 

\emph{Filters} are functions that can be repeatedly invoked
with chunks of input, successively returning processed
chunks of output. More importantly, the result of
concatenating all the output chunks must be the same as the
result of applying the filter to the concatenation of all
input chunks. In fancier language, filters \emph{commute}
with the concatenation operator. As a result, chunk
boundaries are irrelevant: filters correctly handle input
data no matter how it is split. 

A \emph{chain} transparently combines the effect of one or
more filters. The interface of a chain is 
indistinguishable from the interface of its components.
This  allows a chained filter to be used wherever an atomic
filter is expected. In particular, chains can be 
themselves chained to create arbitrarily complex operations.

Filters can be seen as internal nodes in a network through
which data will flow, potentially being transformed many
times along its way.  Chains connect these nodes together.
To complete the picture, we need \emph{sources} and
\emph{sinks}. These are the initial and final nodes of the
network, respectively.  Less abstractly, a source is a
function that produces new data every time it is called.
Conversely, sinks are functions that give a final
destination to the data they receive.  Naturally, sources
and sinks can also be chained with filters to produce
filtered sources and sinks.

Finally, filters, chains, sources, and sinks are all passive
entities: they must be repeatedly invoked in order for
anything to happen.  \emph{Pumps} provide the driving force
that pushes data through the network, from a source to a
sink.

In the following sections, we start with a simplified
interface, which we later refine. The evolution we present
is not contrived: it recreates the steps we followed
ourselves as we consolidated our understanding of these
concepts within our application domain. 

\subsection{A simple example}

Let us use the end-of-line normalization of text as an
example to motivate our initial filter interface. 
Assume we are given text in an unknown end-of-line
convention (including possibly mixed conventions) out of the
commonly found Unix (LF), Mac OS (CR), and DOS (CRLF)
conventions. We would like to be able to write code like the
following:
\begin{quote}
\begin{lua}
@stick#
local in = source.chain(source.file(io.stdin), normalize("\r\n"))
local out = sink.file(io.stdout)
pump.all(in, out)
%
\end{lua}
\end{quote}

This program should read data from the standard input stream
and normalize the end-of-line markers to the canonic CRLF
marker, as defined by the MIME standard. Finally, the
normalized text should be sent to the standard output
stream.  We use a \emph{file source} that produces data from
standard input, and chain it with a filter that normalizes
the data. The pump then repeatedly obtains data from the
source, and passes it to the \emph{file sink}, which sends
it to the standard output.

In the code above, the \texttt{normalize} \emph{factory} is a
function that creates our normalization filter. This filter
will replace any end-of-line marker with the canonic
`\verb|\r\n|' marker. The initial filter interface is
trivial: a filter function receives a chunk of input data,
and returns a chunk of processed data.  When there are no
more input data left, the caller notifies the filter by invoking
it with a \texttt{nil} chunk. The filter responds by returning
the final chunk of processed data.

Although the interface is extremely simple, the
implementation is not so obvious. A normalization filter
respecting this interface needs to keep some kind of context
between calls. This is because a chunk boundary may lie between 
the CR and LF characters marking the end of a line. This
need for contextual storage motivates the use of
factories: each time the factory is invoked, it returns a
filter with its own context so that we can have several
independent filters being used at the same time.  For
efficiency reasons, we must avoid the obvious solution of 
concatenating all the input into the context before
producing any output. 

To that end, we break the implementation into two parts:
a low-level filter, and a factory of high-level filters. The
low-level filter is implemented in C and does not maintain
any context between function calls. The high-level filter
factory, implemented in Lua, creates and returns a
high-level filter that maintains whatever context the low-level
filter needs, but isolates the user from its internal
details. That way, we take advantage of C's efficiency to
perform the hard work, and take advantage of Lua's
simplicity for the bookkeeping.

\subsection{The Lua part of the filter}

Below is the complete implementation of the factory of high-level
end-of-line normalization filters:
\begin{quote}
\begin{lua}
@stick#
function filter.cycle(low, ctx, extra)
  return function(chunk)
    local ret
    ret, ctx = low(ctx, chunk, extra)
    return ret
  end
end
%

@stick#
function normalize(marker)
  return cycle(eol, 0, marker)
end
%
\end{lua}
\end{quote}

The \texttt{normalize} factory simply calls a more generic
factory, the \texttt{cycle} factory. This factory receives a
low-level filter, an initial context, and an extra
parameter, and returns a new high-level filter.  Each time
the high-level filer is passed a new chunk, it invokes the
low-level filter with the previous context, the new chunk,
and the extra argument.  It is the low-level filter that
does all the work, producing the chunk of processed data and
a new context. The high-level filter then updates its
internal context, and returns the processed chunk of data to
the user.  Notice that we take advantage of Lua's lexical
scoping to store the context in a closure between function
calls.  

Concerning the low-level filter code, we must first accept
that there is no perfect solution to the end-of-line marker
normalization problem. The difficulty comes from an
inherent ambiguity in the definition of empty lines within
mixed input. However, the following solution works well for
any consistent input, as well as for non-empty lines in
mixed input. It also does a reasonable job with empty lines
and serves as a good example of how to implement a low-level
filter.

The idea is to consider both CR and~LF as end-of-line
\emph{candidates}.  We issue a single break if any candidate
is seen alone, or followed by a different candidate.  In
other words, CR~CR~and LF~LF each issue two end-of-line
markers, whereas CR~LF~and LF~CR issue only one marker each.
This method correctly handles the Unix, DOS/MIME, VMS, and Mac
OS conventions.

\subsection{The C part of the filter}

Our low-level filter is divided into two simple functions.
The inner function performs the normalization itself. It takes
each input character in turn, deciding what to output and
how to modify the context. The context tells if the last
processed character was an end-of-line candidate, and if so, 
which candidate it was. For efficiency, it uses 
Lua's auxiliary library's buffer interface: 
\begin{quote}
\begin{C}
@stick#
@#define candidate(c) (c == CR || c == LF)
static int process(int c, int last, const char *marker, 
    luaL_Buffer *buffer) {
  if (candidate(c)) {
    if (candidate(last)) {
      if (c == last) luaL_addstring(buffer, marker);
      return 0;
    } else {
      luaL_addstring(buffer, marker);
      return c;
    }
  } else {
    luaL_putchar(buffer, c);
    return 0;
  }
}
%
\end{C}
\end{quote}

The outer function simply interfaces with Lua.  It receives the
context and input chunk (as well as an optional
custom end-of-line marker), and returns the transformed
output chunk and the new context:
\begin{quote}
\begin{C}
@stick#
static int eol(lua_State *L) {
  int ctx = luaL_checkint(L, 1);
  size_t isize = 0;
  const char *input = luaL_optlstring(L, 2, NULL, &isize);
  const char *last = input + isize;
  const char *marker = luaL_optstring(L, 3, CRLF);
  luaL_Buffer buffer;
  luaL_buffinit(L, &buffer);
  if (!input) {
    lua_pushnil(L);
    lua_pushnumber(L, 0);
    return 2;
  }
  while (input < last)
    ctx = process(*input++, ctx, marker, &buffer);
  luaL_pushresult(&buffer);
  lua_pushnumber(L, ctx);
  return 2;
}
%
\end{C}
\end{quote}

Notice that if the input chunk is \texttt{nil}, the operation
is considered to be finished. In that case, the loop will
not execute a single time and the context is reset to the
initial state.  This allows the filter to be reused many
times. 

When designing your own filters, the challenging part is to
decide what will be in the context. For line breaking, for
instance, it could be the number of bytes left in the
current line.  For Base64 encoding, it could be a string
with the bytes that remain after the division of the input
into 3-byte atoms. The MIME module in the \texttt{LuaSocket}
distribution has many other examples. 

\section{Filter chains}

Chains add a lot to the power of filters.  For example,
according to the standard for Quoted-Printable encoding,
text must be normalized to a canonic end-of-line marker
prior to encoding.  To help specifying complex
transformations like this, we define a chain factory that
creates a composite filter from one or more filters.  A
chained filter passes data through all its components, and
can be used wherever a primitive filter is accepted.

The chaining factory is very simple. The auxiliary
function~\texttt{chainpair} chains two filters together,
taking special care if the chunk is the last.  This is
because the final \texttt{nil} chunk notification has to be
pushed through both filters in turn:  
\begin{quote}
\begin{lua}
@stick#
local function chainpair(f1, f2)
  return function(chunk)
    local ret = f2(f1(chunk))
    if chunk then return ret
    else return ret .. f2() end
  end
end
%

@stick#
function filter.chain(...)
  local f = arg[1]
  for i = 2, @#arg do
    f = chainpair(f, arg[i])
  end
  return f
end
%
\end{lua}
\end{quote}

Thanks to the chain factory, we can
define the Quoted-Printable conversion as such:
\begin{quote}
\begin{lua}
@stick#
local qp = filter.chain(normalize("\r\n"), 
  encode("quoted-printable"))
local in = source.chain(source.file(io.stdin), qp)
local out = sink.file(io.stdout)
pump.all(in, out)
%
\end{lua}
\end{quote}

\section{Sources, sinks, and pumps}

The filters we introduced so far act as the internal nodes
in a network of transformations. Information flows from node
to node (or rather from one filter to the next) and is
transformed along the way. Chaining filters together is our
way to connect nodes in this network. As the starting point
for the network, we need a source node that produces the
data. In the end of the network, we need a sink node that
gives a final destination to the data.

\subsection{Sources}

A source returns the next chunk of data each time it is
invoked. When there is no more data, it simply returns
\texttt{nil}.  In the event of an error, the source can inform the
caller by returning \texttt{nil} followed by an error message.

Below are two simple source factories. The \texttt{empty} source
returns no data, possibly returning an associated error
message. The \texttt{file} source works harder, and 
yields the contents of a file in a chunk by chunk fashion:
\begin{quote}
\begin{lua}
@stick#
function source.empty(err)
  return function()
    return nil, err
  end
end
%

@stick#
function source.file(handle, io_err)
  if handle then 
    return function()
      local chunk = handle:read(2048)
      if not chunk then handle:close() end
      return chunk
    end
  else return source.empty(io_err or "unable to open file") end
end
%
\end{lua}
\end{quote}

\subsection{Filtered sources}

A filtered source passes its data through the
associated filter before returning it to the caller. 
Filtered sources are useful when working with
functions that get their input data from a source (such as
the pump in our first example). By chaining a source with one or
more filters, the function can be transparently provided
with filtered data, with no need to change its interface. 
Here is a factory that does the job:
\begin{quote}
\begin{lua}
@stick#
function source.chain(src, f)
  return source.simplify(function()
    if not src then return nil end
    local chunk, err = src()
    if not chunk then 
      src = nil
      return f(nil)
    else return f(chunk) end
  end)
end
%
\end{lua}
\end{quote}

\subsection{Sinks}

Just as we defined an interface a data source, 
we can also define an interface for a data destination. 
We call any function respecting this
interface a \emph{sink}. In our first example, we used a
file sink connected to the standard output. 

Sinks receive consecutive chunks of data, until the end of
data is signaled by a \texttt{nil} chunk. A sink can be
notified of an error with an optional extra argument that
contains the error message, following a \texttt{nil} chunk.  
If a sink detects an error itself, and
wishes not to be called again, it can return \texttt{nil},
followed by an error message. A return value that
is not \texttt{nil} means the source will accept more data.

Below are two useful sink factories. 
The table factory creates a sink that stores
individual chunks into an array. The data can later be
efficiently concatenated into a single string with Lua's
\texttt{table.concat} library function. The \texttt{null} sink 
simply discards the chunks it receives:
\begin{quote}
\begin{lua}
@stick#
function sink.table(t)
  t = t or {}
  local f = function(chunk, err)
    if chunk then table.insert(t, chunk) end
    return 1
  end
  return f, t
end
%

@stick#
local function null()
  return 1
end

function sink.null()
  return null
end
%
\end{lua}
\end{quote}

Naturally, filtered sinks are just as useful as filtered
sources. A filtered sink passes each chunk it receives
through the associated filter before handing it to the
original sink.  In the following example, we use a source
that reads from the standard input.  The input chunks are
sent to a table sink, which has been coupled with a
normalization filter.  The filtered chunks are then
concatenated from the output array, and finally sent to
standard out:
\begin{quote}
\begin{lua}
@stick#
local in = source.file(io.stdin)
local out, t = sink.table()
out = sink.chain(normalize("\r\n"), out)
pump.all(in, out)
io.write(table.concat(t))
%
\end{lua}
\end{quote}

\subsection{Pumps}

Adrian Sietsma noticed that, although not on purpose, our 
interface for sources is compatible with Lua iterators. 
That is, a source can be neatly used in conjunction 
with \texttt{for} loops.  Using our file
source as an iterator, we can write the following code:
\begin{quote}
\begin{lua}
@stick#
for chunk in source.file(io.stdin) do
  io.write(chunk)
end
%
\end{lua}
\end{quote}

Loops like this will always be present because everything 
we designed so far is passive. Sources, sinks, filters: none
of them can do anything on their own. The operation of
pumping all data a source can provide into a sink is so
common that it deserves its own function:
\begin{quote}
\begin{lua}
@stick#
function pump.step(src, snk)
  local chunk, src_err = src()
  local ret, snk_err = snk(chunk, src_err)
  if chunk and ret then return 1
  else return nil, src_err or snk_err end
end
%

@stick#
function pump.all(src, snk, step)
    step = step or pump.step
    while true do
        local ret, err = step(src, snk)
        if not ret then
            if err then return nil, err
            else return 1 end
        end 
    end
end
%
\end{lua}
\end{quote}

The \texttt{pump.step} function moves one chunk of data from
the source to the sink. The \texttt{pump.all} function takes
an optional \texttt{step} function and uses it to pump all the
data from the source to the sink. We can now use everything
we have to write a program that reads a binary file from
disk and stores it in another file, after encoding it to the
Base64 transfer content encoding:
\begin{quote}
\begin{lua}
@stick#
local in = source.chain(
  source.file(io.open("input.bin", "rb")), 
  encode("base64"))
local out = sink.chain(
  wrap(76),
  sink.file(io.open("output.b64", "w")))
pump.all(in, out)
%
\end{lua}
\end{quote}

The way we split the filters here is not intuitive, on
purpose.  Alternatively, we could have chained the Base64
encode filter and the line-wrap filter together, and then
chain the resulting filter with either the file source or
the file sink. It doesn't really matter. The Base64 and the
line wrapping filters are part of the \texttt{LuaSocket}
distribution.

\section{Exploding filters}

Our current filter interface has one flagrant shortcoming.
When David Burgess was writing his \texttt{gzip} filter, he
noticed that a decompression filter can explode a small
input chunk into a huge amount of data. To address this
problem, we decided to change the filter interface and allow 
exploding filters to return large quantities of output data 
in a chunk by chunk manner. 

More specifically, after passing each chunk of input to
a filter, and collecting the first chunk of output, the
user must now loop to receive other chunks from the filter until no
filtered data is left. Within these secondary calls, the
caller passes an empty string to the filter. The filter
responds with an empty string when it is ready for the next
input chunk. In the end, after the user passes a
\texttt{nil} chunk notifying the filter that there is no
more input data, the filter might still have to produce too
much output data to return in a single chunk. The user has
to loop again, now passing \texttt{nil} to the filter each time,
until the filter itself returns \texttt{nil} to notify the
user it is finally done.

Fortunately, it is very easy to modify a filter to respect
the new interface. In fact, the end-of-line translation
filter we presented earlier already conforms to it.  The
complexity is encapsulated within the chaining functions,
which must now include a loop. Since these functions only
have to be written once, the user is rarely affected.
Interestingly, the modifications do not have a measurable
negative impact in the performance of filters that do
not need the added flexibility. On the other hand, for a
small price in complexity, the changes make exploding
filters practical. 

\section{A complex example}

The LTN12 module in the \texttt{LuaSocket} distribution
implements the ideas we have described. The MIME
and SMTP modules are especially integrated with LTN12, 
and can be used to showcase the expressive power of filters,
sources, sinks, and pumps. Below is an example 
of how a user would proceed to define and send a
multipart message, with attachments, using \texttt{LuaSocket}:
\begin{quote}
\begin{mime}
local smtp = require"socket.smtp"
local mime = require"mime"
local ltn12 = require"ltn12"

local message = smtp.message{
  headers = {
    from = "Sicrano <sicrano@example.com>",
    to = "Fulano <fulano@example.com>",
    subject = "A message with an attachment"},
  body = {
    preamble = "Hope you can see the attachment\r\n",
    [1] = {
      body = "Here is our logo\r\n"},
    [2] = {
      headers = {
        ["content-type"] = 'image/png; name="luasocket.png"',
        ["content-disposition"] = 
          'attachment; filename="luasocket.png"',
        ["content-description"] = 'LuaSocket logo',
        ["content-transfer-encoding"] = "BASE64"},
      body = ltn12.source.chain(
        ltn12.source.file(io.open("luasocket.png", "rb")),
        ltn12.filter.chain(
          mime.encode("base64"),
          mime.wrap()))}}}

assert(smtp.send{
  rcpt = "<fulano@example.com>",
  from = "<sicrano@example.com>",
  source = message})
\end{mime}
\end{quote}

The \texttt{smtp.message} function receives a table
describing the message, and returns a source. The
\texttt{smtp.send} function takes this source, chains it with the
SMTP dot-stuffing filter, connects a socket sink
with the server, and simply pumps the data. The message is never 
assembled in memory.  Everything is produced on demand, 
transformed in small pieces, and sent to the server in chunks, 
including the file attachment that is loaded from disk and 
encoded on the fly. It just works.

\section{Conclusions}

In this article, we introduced the concepts of filters,
sources, sinks, and pumps to the Lua language. These are
useful tools for stream processing in general. Sources provide
a simple abstraction for data acquisition. Sinks provide an
abstraction for final data destinations. Filters define an
interface for data transformations.  The chaining of
filters, sources and sinks provides an elegant way to create
arbitrarily complex data transformations from simpler
components. Pumps simply move the data through.  

\end{document}