I recently wrote-up some Clojure-based Wide Finder work, in Parallel I/O and References. Driven mostly by ideas from commenters, I did some refactoring and knob-spinning. The results are interim at best, and on US-Thanksgiving eve almost nobody’s looking, but it’s good to get this stuff on the record.
[This is part of the Concur.next series.]
Map-Reduce ·
Both
John
Hart and
Avi
Bryant argued that
Map/Reduce is the natural
solution to this problem, since the threads can do all their work without
interacting until they’re done. So I went ahead and implemented it. I had to
change the interface a bit; the per-line call gets an accumulator
argument and may return an updated version, which will be passed into the next
call. And if the per-line call gets nil
for its
line
argument, that means that there are no more lines and it has
to proceed to the collation phase. So per-call code now looks like this:
1 (defn record [ target accum ]
2 (if accum
3 (if-let [ counter (accum target) ]
4 (assoc accum target (inc counter))
5 (assoc accum target 1))
6 { target 1 }))
7
8 (defn proc-line [ line so-far accum ]
9 (if line
10 (if-let [[_ target] (re-find re line)]
11 (record target accum)
12 accum)
13 (send so-far (fn [existing new] (merge-with + existing new)) accum)))
It’s pretty self-explanatory; the accum
is a thread-private
map of counts. The counting code starting at Line 2 is a
little klunky, because it has to create an accumulator if there isn’t one
already, and create a counter for the input target if there isn’t one
already. But not rocket science.
Also, you gotta love Clojure’s merge-with
, which
combines hash tables in the most obvious way imaginable; see Line 13.
Its relative speed, you ask? Well, let’s look at agents first.
Agents · Other commenters pointed out that the whole thing is decoupled, so why not just use agents for an Erlang-like model? Well, OK then; the client code is indeed a bit simpler:
1 (defn add [so-far target]
2 (if-let [count (so-far target)]
3 (assoc so-far target (inc count))
4 (assoc so-far target 1)))
5
6 (defn proc-line [ line so-far accum ]
7 (if line
8 (if-let [[_ target] (re-find re line)]
9 (send so-far add target))))
There’s a Clojure update-in
function that could have
simplified the add
function, but who cares, this is real simple
and easy to understand.
Performance · At this point, the tone becomes a bit grumpy. Since the map/reduce implementation seemed the fastest, I thought I’d use it to measure the throughput while I spun the thread-count and block-size dials.
I explored block sizes of between 8M and 64M, and thread counts between 8
and 64. The results were clear as mud. Generally, 16M was the best block
size and thread-count results are all over the map. The best time was 27:28,
the worst 36:39. With the refs
approach the time was 36:08, and
the pure agent code was 50:28. The best combination is not even twice as fast
as the worst.
And times in the 20-to-50-minute range for processing 45G of data are not exactly world-shaking; check out the Wide Finder 2 Results page.
A little research reveals that a simple single-threaded Clojure loop that maps blocks out of the file and uses java.nio.charset.CharsetDecoder to turn it into Java UTF-16 characters burns just over 70 minutes and is apparently CPU-limited. On the downside, this sucks! On the upside it does show that the concurrency is reducing the suckaliciousness.
I wondered how much of that 70 minutes might be a Clojure tax, so I wrote a Java version, as close to identical as I could manage, and it took... about 68 minutes. Good for Clojure.
Difficulties and Dmitriy · At the moment, I’m running this on the whole 45G dataset because I thought the parallelization work should exercise both CPU and I/O capabilities. Dmitriy V’jukov posted a long comment which contained an implicit challenge to that notion. Most high-performance applications these days go to immense lengths to avoid doing I/O where throughput or latency really matter; thus memcached and related technologies.
So maybe I should run this on just 4 or 5G and take steps to ensure that the cache is hot before I start... hmm.
Moving On · I’ll kick the Clojure concurrency primitives around a bit longer until I’m more confident that I haven’t missed too much that’s important. My feeling at the moment is that they’re basically sound; a little expensive to use, but there’s plenty of scope to improve that.
On the other hand, the simplicity and straightforwardness of the agent-based approach is not lost on me. I’m trying to shake loose my Erlang bias, but this isn’t helping.
Comment feed for ongoing:
From: Nona Myous (Nov 25 2009, at 17:52)
Do you use type annotations in the performance critical code?
Consider sharing your code. More eyes might spot issues.
[link]
From: John (Nov 26 2009, at 00:42)
Hey Tim. I am the commenter "John" from the technomancy thread. I just left a reply there but figured I'd drop a note
here since his comments are moderated.
Also, I just noticed that my comment on his blog doesn't have my most recent version which I had discussed with him on #clojure.
Try this version which reads chunks of lines and is one or two orders of magnitude faster than the original I posted in the comments over there (depending on the regex you use):
http://pastie.org/715782
You can reach me at (my first name at milo.com).
[link]
From: James Abley (Nov 26 2009, at 05:26)
+1 for sharing the code. Please put it on github or similar.
[link]
From: Randy Hudson (Nov 26 2009, at 09:35)
Just a nit: you can use the 3-argument 'get' function to simplify the counter handling:
(assoc accum target (inc (get accum target 0)))
[link]