Conclusion first: It turns out that Clojure’s concurrency primitives allow you, with a very moderate amount of uncomplicated code, to take advantage of parallel hardware and outperform really fast software when it doesn’t take such advantage.
I recently wrote about Clojure’s concurrency primitives and how you’d use them for a simple logfile-processing problem. The problem divides naturally in two: read the file in parallel, and process the lines in parallel. That last piece discussed the line processing, this one the file reading.
[This is part of the Concur.next series.]
Does It Work? · This code actually runs now; the first-cut unoptimized version presented here computes Wide-Finder-style stats on a Sun T2000 “Niagara” faster than a simple single-threaded Perl script while keeping, on average, over ten of its 32 logical CPUs busy; the Niagara’s real maximum throughput is achieved when it reports 16 or so as being busy, so this isn’t bad.
Perl does line-at-a-time processing as fast as any software in the world; but a Lisp layered on the JVM can bring enough parallelism to bear, without too much complexity, to come out ahead.
Yes, a multi-process version in Perl or C or whatever could probably outrun this Clojure code, but not, I suspect, at a comparable level of simplicity and clarity.
Obvious things to investigate in the interest of making this more efficient:
Changing the block size (currently 50M for the big runs).
Changing the thread count (currently 32 for the big runs).
Changing I/O strategy, currently NIO memory-mapping, which seems plausible, but who knows?
Changing the usage of concurrency primitives, currently a fairly ad-hoc mix of references and agents.
Adjusting JVM memory-management and garbage-collection options.
I might get around to doing some of this, but I think the key point has already been made; you can construct a parallel line reader in 100 or so lines of Clojure, and a parallel statistics calculator in under 40, and they run together pretty well on a modern many-core CPU.
The rest of this (long) essay walks through my first cut at a parallel line-reader, is immensely detailed, and probably only of interest to people who care about some combination of Lisp, the JVM, and concurrency primitives.
Paralines ·
For convenience, I’ll publish once again the doc-string for Paralines’
top-level function, named read-lines
.
Reads the named file and feeds the lines one at a time to the function 'destination', which will be called in parallel on many threads with two arguments, the first being a line from the file, the second 'user-data'. When all the lines have been processed, the the 'all-done' function will be called with 'user-data' as its single argument. The 'destination' function will be called on multiple threads in parallel, so its logic must be thread-safe. There is no guarantee in which order the lines from the file will be processed. The 'settings' argument is a map; the default values for the number of threads and file block-size may be overridden with the :width and :block-size values respectively.
How It Works · Paralines fires up a bunch of threads, each of which runs a simple loop, reading blocks of data out of the file, extracting lines from them, and feeding those to the user-provided destination function. The default settings are to run ten threads and read 10MB blocks; in my first cut at the big 45G Wide-Finder file, I ran 32 threads and 50MB blocks.
The blocks are mapped into memory using the
java.nio.channels.FileChannel map
function and turned into a string via java.nio.charset.CharsetDecoder
and then the
java.nio.CharBuffer
toString method. Java is always going to be at a bit of a disadvantage in
this kind processing of compared to
languages that don’t have to convert raw data to UTF-16 before they can deal
with it.
Once the data is a string, the \n
-separated lines are
extracted and processed, straightforwardly. The only hard-ish part is
handling lines which stretch across block boundaries.
Apologies in Advance · Serious Lispers, reading this, will become acutely aware that I’m not one of them. In particular, Phil Hagelberg’s masterful and instructive in which things are mapped, but also reduced (go read it!) made me keenly aware of the shallowness of my understanding of Lisp culture generally and Clojure capabilities specifically. In the code that follows, I’m certain that for nearly every function there’s a more idiomatic and functional way to accomplish the task at hand.
Nonetheless, I’m going to go ahead and write up this work. I am serving here as the stand-in for a reasonably competent and experienced developer from outside the Lisp community investigating how well modern programming systems’ concurrency features work for me.
Concurrency Where? · There are two places in Paralines where concurrency issues arise:
Dealing out chunks of the input file to the threads which will read them.
Reassembling the halves of lines which cross block boundaries, since any pair of halves might have been read in different threads.
Here are reader
and next-block
, which together
handle the job of dealing the data out to the worker threads.
reader
serves as the mainline for each thread.
1 (defn reader [ params index ]
2 (loop [ block (next-block params index)]
3 (if block
4 (do
5 (read-block (params :channel) (block :index) (block :offset) (block :size)
6 (params :destination) (params :user-data) (params :fragments) (block :last))
7 (recur (next-block params index))))))
There’s nothing terribly interesting here. The params
argument is just a read-only package of information about the task at hand;
the I/O channel, block-size, user-provided functions and arguments.
The index
argument is a Clojure ref
to the
current block index in the file, shared by all the worker threads.
The routine calls next-block
to update the
index
value and return block
, a package of the
information about the next block to be read; then it calls
read-block
to do the actual I/O. Here’s next-block
.
1 (defn next-block [params index]
2 (let [next (dosync (alter index inc))
3 this (dec next)
4 last (= this (params :block-count))
5 size (if last (params :tail-size) (params :block-size)) ]
6 (if (<= this (params :block-count))
7 { :index this :size size :last last :offset (* this (params :block-size)) }
8 nil)))
The only interesting bit here is Line 2, which safely and concurrently
increments the shared index
value. The rest of the routine
checks whether to signal end-of-file by returning nil
or fill in
a map with the info about the next block to read.
The idea is that, even though we’re working on the file in multiple
parallel threads, we’re apt to get the best results if we read the blocks out
of it in order. While we all know that Memory Is The New Disk, we
sometimes forget that Disk Is The New Tape. So a bunch of threads working
on the
data can call next-block
to read through it in natural order.
There’s what I think is an anti-pattern here; in my first cut through this code, all my functions had tons of arguments; as many as eight or nine. I’m used to Ruby and Java, where you can pass around just an object or two that contain everything you need. So I ended up using Clojure maps (essentially hash tables) to bundle up related groups of arguments. This feels like taking the first few stumbling steps toward Object-Orientation, but it also feels like I’m Doing It Wrong, somehow.
read-block
·
Here’s how Paralines reads a block
of data:
1 (defn read-block
2 "Read a block at the given index from the I/O channel, divide it up into
3 lines and send them off to the destination. Also handle partials lines from the
4 beginning and ends of blocks."
5 [ channel index offset size destination user-data fragments last ]
6 (println (str "Block " index))
7 (let [mode (. java.nio.channels.FileChannel$MapMode READ_ONLY)
8 block (. channel map mode offset size)
9 decoder (. (. java.nio.charset.Charset forName "UTF-8") newDecoder)
10 text (. (. decoder decode block) toString)
11 first-nl (. text indexOf 10)
12 front (. text substring 0 first-nl)
13 last-nl (. text lastIndexOf 10)
14 back (. text substring (inc last-nl)) ]
15
16 (send fragments patcher front index :front destination user-data)
17 (if (not last)
18 (send fragments patcher back index :back destination user-data))
19
20 (loop [ nl first-nl ]
21 (let [ from (inc nl) to (. text indexOf 10 from) ]
22 (if (< from last-nl)
23 (do
24 (destination (new String (. text substring from to)) user-data)
25 (recur to)))))))
Line 6: I have spent most of my professional life processing large datasets of one kind or another, and I’ve learned that if you’re going to maintain your sanity you have to have some diagnostic output to bolster your faith, while your code grinds away for minutes or hours at a time, that it’s actually doing something.
Line 7: Java ceremony translates into Clojure as necessary, with considerable effort made to minimize the pain.
Line 8: This is where Java NIO is asked to map the block at the
indicated offset and size into memory; presumably, on Solaris, using something
like mmap(2)
.
Note, in this series of initializations, one Clojure idiom for invoking Java
methods on objects: (. object-name method-name arg arg arg...)
Line 10: The Java CharBuffer
gets turned into a Java
String
(and, since this is mostly ASCII) doubles in size as a
side-effect. Sigh.
Lines 11-14: Bear in mind that 10 is the numeric code-point for newline.
This code is finding the leading and trailing block-spanning line fragments,
here called front
and back
, and also the first and
last newline positions, between which will be found only complete lines.
Lines 16-18: Here we queue the leading and trailing fragments for
re-assembly. The partial lines are stored in a
Clojure agent called
fragments
; and we use
Clojure’s send
primitive to fire-and-forget invocations of the
patcher
function (discussed in detail later) against
it, relying on the system to make sure that only one at a
time gets executed.
Lines 20-25: This is the loop that runs through the big string stored in
text
one line at a time. The functions used to set this up and
run it are indexOf
, lastIndexOf
, and
substring
, well-known java.lang.String
methods.
Line 24: destination
is the function the user passed in to be
called on each line of the file, and user-data
the user-provided
argument to be passed to it along with each line.
“new String” ·
This appears on Line 24 of the sample above, and in a couple of other
places below. Each line is extracted from
the big string buffer with a Java substring
method, and could in
principle be passed to the user-provided per-line function as-is.
Unfortunately, as a result of a
well-known
outstanding Java issue, should the user code keep that substring around,
the big string of which it’s part will never be garbage-collected. When you
are trying to process data which is in aggregate much larger than the amount
of memory than Java has to work with, this will predictably and quickly
lead to disaster.
Fortunately, the java.lang.String
constructor contract
promises that this invocation will create a new instance of the string
unconnected to anything else and unlikely to cause memory-mangement
problems.
Fragment Assembly ·
Here’s the patcher
function that tries to put together
complete lines from the fragments at the beginnings and ends of blocks.
1 (defn patcher
2 "fragments is a map whose keys are of the form (str index :front|:back) telling you
3 it's a partial line off the front or back of the block. When we get a front/back
4 combination that goes together, we put them together and send them off to the
5 destination function"
6 [fragments fragment index front-back destination user-data]
7
8 (let [key (str index front-back)
9 is-front (= front-back :front)
10 other-key (if is-front (str (dec index) :back) (str (inc index) :front))
11 other (fragments other-key)]
12
13 (if (nil? other)
14 (assoc fragments key (new String fragment)) ; not found, add it
15 (let [ line (if is-front (str other fragment) (str fragment other)) ]
16 (destination line user-data)
17 fragments))))
The first argument, fragments
, is the Clojure agent that
manages the unmatched line fragments. Since this routine is being invoked
with send
, it has exclusive access to fragments
and
doesn’t need to worry about synchronization.
The approach is, once again, the Simplest Thing That Could Possibly Work; a
hash table indexed by a string concatenation of block-number and the symbol
:front
or :back
.
When you get a new fragment, you
get its block number and front/back symbol. So, for example, if you got the
fragment from the :front
of block 32, you’d see whether you
already had the :back
fragment from block 31. If so, glue them
together and call the user-provided subroutine. If not, store that
32/:front
fragment and assume that 31/:back
will be
along eventually.
Note the use of (New String ...)
for garbage-collection
self-defense, as discussed above.
And finally, note that the function’s value must always be the new value of
the fragments
map; the function applied by send
is
expected to update the agent it’s applied to.
read-lines · Finally, here’s the traffic-cop function that ties it all together.
1 (defn read-lines
2 "Reads the named file and feeds the lines one at a time to the function
3 'destination', which will be called in parallel on many threads with two
4 arguments, the first being a line from the file, the second 'user-data'.
5 When all the lines have been processed, the the 'all-done' function will be
6 called with 'user-data' as its single argument. The 'destination' function
7 will be called on multiple threads in parallel, so its logic must be
8 thread-safe. There is no guarantee in which order the lines from the file
9 will be processed. The 'settings' argument is a map; the default values for the
10 number of threads and file block-size may be overridden with the :width and
11 :block-size values respectively."
12 [file-name destination all-done user-data settings]
13 (let [
14 block-size (or (settings :block-size) (* 10 1024 1024))
15 channel (. (new FileInputStream file-name) getChannel)
16 data-size (. channel size)
17 fragments (agent {})
18 block-count (quot data-size block-size)
19 thread-count (max 1 (min block-count (or (settings :width) 10)))
20 tail-size (mod data-size block-size)
21 params {:block-count block-count
22 :block-size block-size
23 :tail-size tail-size
24 :channel channel
25 :user-data user-data
26 :destination destination
27 :all-done all-done
28 :fragments fragments }
29 index (ref 0)
30 threads (make-threads thread-count params index) ]
31
32 ;; prime the first/last fragments pump
33 (send fragments patcher "" -1 :back destination user-data)
34
35 (doseq [thread threads]
36 (.start thread)
37 (Thread/sleep 100)) ; make sure the first N blocks are read in-order
38 (doseq [thread threads]
39 (.join thread))
40 (await fragments)
41 (all-done user-data)))
I’ve declined to include the make-threads
function here because
I wasn’t smart to figure out the right way of cooking it up with some
combination of map
and dorun
, so it’s a vile
loop
-based thing.
The only things here of concurrency-related interest here are the explicit
spawning of the worker threads then waiting for their completion, in Lines 35-39,
and the await
in line 40 that makes sure all the line-fragment
patching has been done.
Conclusion · It’s pretty short. It’s pretty readable. It parallelizes well on modern hardware. It was written by someone who is not a Lisper and had never touched Clojure a few weeks ago. What’s not to like?
Comment feed for ongoing:
From: Tkil (Nov 19 2009, at 18:08)
Tim --
Regarding sending lots of parameters around (or a hash of context), one Lisp-ish method I've seen is to use dynamic binding.
It runs counter to most procedural and OOP thinking, where you want to minimize the scope of any given value; but even in those paradigms, there are things that are effectively global anyway (program options, i/o channels, etc). And Perl has a huge amount of context all the time.
I'm sure that a proper Lisper will correct me, but using dynamic binding might solve some of your issues (especially the global params like block size and thread count).
[link]
From: Paul Smith (Nov 19 2009, at 18:46)
Just a quick-pass observation: it seems to be idiomatic Clojure to use the (binding) macro for handling runtime configuration and settings, rather than bundling up and handing around a hash map from top-level functions down through the call stack.
http://clojure.org/api#toc123
So for example defining a var:
(def *num-threads* 10)
With the *s being themselves idiomatic for a top-level var. Your lower-level functions would refer to the var as needed. Then at runtime:
(binding [*num-threads* 20]
(read-lines ...))
Sort of like a let but dynamically scoped instead of lexically.
[link]
From: stand (Nov 19 2009, at 22:27)
That technomancy link is quite interesting. I think that the pmap function is quite magical.
[link]
From: David Roussel (Nov 20 2009, at 01:33)
I guess this is obvious, but worth pointing out. Converting a 50MB of bytes into UTF-8 requires an extra 100MB. But if you assume LF code page and hunt for lines in byte world then you only need convert one line at a time to a String and thus UTF-8. Not only is slot less memory needed, but it'll be cache local, oh and you won't need to call new String later on.
[link]
From: Jeff Rose (Nov 20 2009, at 03:59)
You can simplify things in a few spots by using some of the Clojure helpers for java interop.
To create a new instance of a class you can append a period. So instead of this:
(new String 1234)
you can do this:
(String. 1234)
I think it's more idiomatic to call java methods with (.method object), which more closely mirrors a typical Clojure function call.
(. text lastIndexOf 10)
becomes:
(.lastIndexOf text 10)
Last, you might find it a bit clearer to use a forward-slash for static method calls, and when you need to do successive method calls in a chain, instead of having code that looks like this:
(. (. java.nio.charset.Charset forName "UTF-8") newDecoder)
you get:
(.newDecoder (java.nio.charset.Charset/forName "UTF-8"))
Checkout the .. and doto forms that help cut down code when operating on Java objects also. I think all this stuff helps to make your code tighter and more easily grokked because it cuts out unnecessary parenthesis and periods.
[link]
From: Phil Hagelberg (Nov 20 2009, at 10:10)
You're too kind. While my version is interesting from the perspective of someone learning more about higher-order functions, commenters submitted a pmap-based version that absolutely smokes my agent implementation. This NIO memory-mapping stuff is really interesting on the disk side of things; thanks for posting.
[link]
From: Dmitriy V'jukov (Nov 23 2009, at 02:54)
Hi Tom,
I am curious as to what result you are able to achieve with this implementation. You've described a lot of details but seems to miss the performance. It's interesting to see where Clojure falls into (LOC, performance) space on this problem. Are you going to update the Results page when you complete the implementation?
I've independently come up with basically the same design for IO. I.e. I have something along the lines of your paralines, that creates several worker threads that do IO in parallel but sequentially regarding file, and then feeds user callback with independent lines, and when whole file is processed it calls aggregate user callback. However I handle line fragments differently - thread i (i.e. thread that read i-th file block) sends line fragment to thread i+1, and then it's responsibility of thread i+1 to process the line fragment.
Your approach seems slightly simpler.
It works Ok up to 10M lines input file (i.e. when whole file is cached). However after testing on 218M input file I had to rethink the whole approach. I've figured out that on 150MB/sec (disc speed) I am able to crunch lines with basically 1 thread (there is some 2000 cycles per line available). Probably I will submit purely single-threaded program, I will be interesting to see how fast it will be able to go... and basically on what we are trying to improve, because, you know, concurrent processing can't be called simplification in any way. Concurrent processing is always about performance, and when we are talking about performance even agent-oriented models do not provide simplicity because you still have to take into account all such things as granularity, agent overload, temporal locality and spatial locality. So I think concurrent program must be at least faster that single-threaded to be worth doing.
As for multi-threaded processing I think the only way to go (to improve on current C++ implementations) is to reuse cached file data. You know, if you just downloaded or generated the file, or process the file with several programs, or process the file with single program but several times (with different parameters), in all that cases it's reasonably to assume that significant part of the file is still in OS cache. Why not try to take advantage of this? If one always reads the file sequentially from begin to end, well, it's a kind of the best way to most efficiently cold data in cache, i.e. the most efficient way to *not* reuse data in cache.
And reading from file cache goes at some 5 times faster than reading from disk. So multi-threading is definitely of help here.
[link]
From: Preston L. Bannister (Nov 27 2009, at 23:09)
Note that results are distorted somewhat by the fact that basic file reads for anything built on the JVM are shockingly slow (http://bannister.us/weblog/2008/06/22/why-fileinputstream-is-slow/). Not an intrinsic limitation of Java (or the JVM), but a surprising implementation choice.
Rather throws all the subsequent discussion off-kilter.
[link]
From: Tim (Nov 28 2009, at 08:50)
Preston,note that we're using channel.map as opposed to read. I'd assumed that was using mmap() under the covers. I should do like you did and look at the source.
[link]