Thursday, August 14, 2008

MapReduce in Erlang

MapReduce is a Java framework for parallel computations, designed and developed by Google. It is often referred to not only as a platform, but also more general as an algorithm (or a "programming model", as Google calls it itself). The main idea behind MapReduce is to spawn computations on a set of data to many single processes (map), and then gather their results (reduce).

Implementing MapReduce in Erlang is unbelievably trivial. Joe Armstrong, Erlang inventor and main architect, published a sample pmap/2 function, which can be used as a parallel equivalent of standard Erlang lists:map/2:
pmap(F, L) ->
    S = self(),
    Pids = lists:map(fun(I) ->
                         spawn(fun() -> do_f(S, F, I) end)
                     end, L),
    gather(Pids).

gather([H|T]) ->
    receive
        {H, Ret} -> [Ret|gather(T)]
    end;
gather([]) ->
    [].

do_f(Parent, F, I) ->
    Parent ! {self(), (catch F(I))}.
Joe didn't provide any particular examples of using pmap, so I wrote a test function that makes a sequence of Fibonacci numbers. First, let's start an Erlang shell with -smp flag, that enables it to use many CPUs and many CPU cores:
erl -smp
Now define a Fibonacci function:
fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 0 -> fib(N-1) + fib(N-2).
and generate a test list as a sequence of numbers (for example from 0 to 40):
L = lists:seq(0,40).
To make every element of the list L to be processed by fib/1 function we use lists:map/2:
lists:map(fun(X) -> fib(X) end, L).
To do a parallel computation we need to use pmap/2 instead of lists:map/2:
pmap(fun(X) -> fib(X) end, L).
On my laptop with Core2 Duo T5450 1.66 Ghz running the parallel version of fib/1 function reduced execution time from 53.405534 to 31.354125 seconds (as measured by timer:tc/3). On dual Xeon E5430 2.66 Ghz (8 cores total) the speedup was from 25.635218 seconds to 10.017358. It is less than Joe Armstrong managed to achieve (he probably used more sophisticated test functions and different hardware than a regular PC), but it clearly shows how easy it is to parallelize an Erlang program - just replace every instance of map/2 with pmap/2 and you're done.

I prepared a modified version of pmap/2, which spawns processes not only on local machine, but also on all nodes in a cluster:
pmap(F, L) ->
    S = self(),
    Nod = [node()|nodes()],
    {Pids, _} = lists:mapfoldl(
        fun(I, {N1, N2}) ->
            case N1 == [] of
                true -> N = N2;
                false -> N = N1
            end,
            [H|T] = N,
            {spawn(H, fun() -> do_f(S, F, I) end), {T, N2}}
        end, {Nod, Nod}, L),
    gather(Pids).
Using the modified pmap/2, I got similar results with a cluster of two Erlang shells running on a single Core2 Duo machine as with Joe's function on a single shell started in smp mode. This version of pmap/2 also allows you to rebalance computations by starting as many Erlang shells as there are CPUs (or CPU cores) on multiple machines, and joining them in one cluster.

4 comments:

Matt said...

Unfortunately, it can't scale linearly because the messages need to be copied. However, it scales much better than any imperative program :)

Krzysztof (Christopher) Kliś said...

The main problem here is that pmap/2 results are gathered sequentially, so the execution time of single round of computations is at most as fast as the execution time of the slowest instance. Joe Armstrong managed to achieve a speedup factor of 7 with 16 CPUs, and no better, despite increasing the number of CPUs up to 24.

Gleber said...

Take a look at
http://code.google.com/p/plists/

djstrong said...

I tried using plists module and that's the result with -smp enable on Core2 Duo T9400:
pmap - 23.25s
lists:map - 39.88s
plists:map - 22.12s

Map function from plists module is a little bit quicker than above pmap function.