Saturday, July 31, 2010

MapReduce with Gambit Scheme & Termite

Termite is a library that runs on top of Gambit Scheme and implements concurrency model inspired by Erlang (there is also a Chicken Scheme port, but it's so far incomplete). Since I have already posted about Termite some time ago, I will not go into details how to write distributed applications with it. Instead, I want to show you how to use Termite to implement a simple MapReduce algorithm.

A trivial implementation uses a simple worker process that executes some code, and a pmap function which spawns multiple workers and collects the results:
(define (worker)
  (let* ((msg (?)) (pid (car msg)) (fun (cadr msg)) (arg (caddr msg)))
    (! pid (fun arg))))

(define (pmap fun lst)
  ; spawn workers
  (for-each 
    (lambda (x) (let ((p (spawn worker))) (! p (list (self) fun x))))
    lst)
  ; collect results
  (let loop ((i (length lst)) (result '()))
    (if (= 0 i) (reverse result) (loop (- i 1) (cons (?) result)))))
So far it works similar to its Erlang equivalent. However, this version does not guarantee (just like the Erlang one) that you get the results in the same order as the arguments passed to pmap. Let's consider function f, which holds execution of the current thread for a given number of seconds:
(define (f x) (thread-sleep! x) x)
If you run the following code:
(pmap f (list 1 2 3 2 1))
The result will be:
(1 1 2 2 3)
Since the first results on the list will come from the processes that finished first.

A solution to this problem is to make a single worker process using a sequencer to mark spawned threads. Numbers generated by the sequencer are then used to sort the results to achieve the correct order of the result list:
(define (worker)
  ; init sequencer
  (let loop ((n 0))
    (let ((msg (?)))
      ; terminate worker on request
      (if (not (eqv? 'exit (car msg)))
        (let ((pid (car msg)) (fun (cadr msg)) (arg (caddr msg)))
          ; start a new thread
          (spawn (lambda () (! pid (cons n (fun arg)))))
          ; increase sequencer
          (loop (+ n 1)))))))

(define (pmap fun args)
  ; start worker
  (let ((p (spawn worker)) (result '()))
    ; send data to the worker
    (for-each (lambda (x) (! p (list (self) fun x))) args)
    ; collect results
    (let loop ((i (length args)) (lst '()))
      (if (= 0 i)
        (set! result lst)
        (loop (- i 1) (cons (?) lst))))
    ; terminate worker
    (! p (list 'exit))
    ; sort results
    (let loop ((in (qsort result <)) (out '()))
      (if (null? in)
        out
        (loop (cdr in) (cons (cdar in) out))))))
You can use any function you wish to sort the results. I used a modified version of quicksort found at Rosetta Code:
(define (qsort l gt?)
  (let q ((l l))
    (if (null? l)
      l
      (let ((s (split-by (cdr l) (lambda (x) (gt? (car x) (caar l))))))
        (append (q (car s)) (list (car l)) (q (cdr s)))))))

(define (split-by l p)
  (let loop ((low (list)) (high (list)) (l l))
    (if (null? l)
     (cons low high)
     (if (p (car l))
       (loop low (cons (car l) high) (cdr l))
       (loop (cons (car l) low) high (cdr l))))))
Now when you run the test again, you get the result list in the same order as the argument list, no matter what the execution time of single processes was:
(pmap f (list 1 2 3 2 1))

(1 2 3 2 1)
You can download the pmap library here.

No comments: