Implementing MapReduce in Erlang is unbelievably trivial. Joe Armstrong, Erlang inventor and main architect, published a sample pmap/2 function, which can be used as a parallel equivalent of standard Erlang lists:map/2:
pmap(F, L) -> S = self(), Pids = lists:map(fun(I) -> spawn(fun() -> do_f(S, F, I) end) end, L), gather(Pids). gather([H|T]) -> receive {H, Ret} -> [Ret|gather(T)] end; gather([]) -> []. do_f(Parent, F, I) -> Parent ! {self(), (catch F(I))}.Joe didn't provide any particular examples of using pmap, so I wrote a test function that makes a sequence of Fibonacci numbers. First, let's start an Erlang shell with -smp flag, that enables it to use many CPUs and many CPU cores:
erl -smpNow define a Fibonacci function:
fib(0) -> 0; fib(1) -> 1; fib(N) when N > 0 -> fib(N-1) + fib(N-2).and generate a test list as a sequence of numbers (for example from 0 to 40):
L = lists:seq(0,40).To make every element of the list L to be processed by fib/1 function we use lists:map/2:
lists:map(fun(X) -> fib(X) end, L).To do a parallel computation we need to use pmap/2 instead of lists:map/2:
pmap(fun(X) -> fib(X) end, L).On my laptop with Core2 Duo T5450 1.66 Ghz running the parallel version of fib/1 function reduced execution time from 53.405534 to 31.354125 seconds (as measured by timer:tc/3). On dual Xeon E5430 2.66 Ghz (8 cores total) the speedup was from 25.635218 seconds to 10.017358. It is less than Joe Armstrong managed to achieve (he probably used more sophisticated test functions and different hardware than a regular PC), but it clearly shows how easy it is to parallelize an Erlang program - just replace every instance of map/2 with pmap/2 and you're done.
I prepared a modified version of pmap/2, which spawns processes not only on local machine, but also on all nodes in a cluster:
pmap(F, L) -> S = self(), Nod = [node()|nodes()], {Pids, _} = lists:mapfoldl( fun(I, {N1, N2}) -> case N1 == [] of true -> N = N2; false -> N = N1 end, [H|T] = N, {spawn(H, fun() -> do_f(S, F, I) end), {T, N2}} end, {Nod, Nod}, L), gather(Pids).Using the modified pmap/2, I got similar results with a cluster of two Erlang shells running on a single Core2 Duo machine as with Joe's function on a single shell started in smp mode. This version of pmap/2 also allows you to rebalance computations by starting as many Erlang shells as there are CPUs (or CPU cores) on multiple machines, and joining them in one cluster.
4 comments:
Unfortunately, it can't scale linearly because the messages need to be copied. However, it scales much better than any imperative program :)
The main problem here is that pmap/2 results are gathered sequentially, so the execution time of single round of computations is at most as fast as the execution time of the slowest instance. Joe Armstrong managed to achieve a speedup factor of 7 with 16 CPUs, and no better, despite increasing the number of CPUs up to 24.
Take a look at
http://code.google.com/p/plists/
I tried using plists module and that's the result with -smp enable on Core2 Duo T9400:
pmap - 23.25s
lists:map - 39.88s
plists:map - 22.12s
Map function from plists module is a little bit quicker than above pmap function.
Post a Comment