Saturday, July 31, 2010

MapReduce with Gambit Scheme & Termite

Termite is a library that runs on top of Gambit Scheme and implements concurrency model inspired by Erlang (there is also a Chicken Scheme port, but it's so far incomplete). Since I have already posted about Termite some time ago, I will not go into details how to write distributed applications with it. Instead, I want to show you how to use Termite to implement a simple MapReduce algorithm.

A trivial implementation uses a simple worker process that executes some code, and a pmap function which spawns multiple workers and collects the results:
(define (worker)
  (let* ((msg (?)) (pid (car msg)) (fun (cadr msg)) (arg (caddr msg)))
    (! pid (fun arg))))

(define (pmap fun lst)
  ; spawn workers
  (for-each 
    (lambda (x) (let ((p (spawn worker))) (! p (list (self) fun x))))
    lst)
  ; collect results
  (let loop ((i (length lst)) (result '()))
    (if (= 0 i) (reverse result) (loop (- i 1) (cons (?) result)))))
So far it works similar to its Erlang equivalent. However, this version does not guarantee (just like the Erlang one) that you get the results in the same order as the arguments passed to pmap. Let's consider function f, which holds execution of the current thread for a given number of seconds:
(define (f x) (thread-sleep! x) x)
If you run the following code:
(pmap f (list 1 2 3 2 1))
The result will be:
(1 1 2 2 3)
Since the first results on the list will come from the processes that finished first.

A solution to this problem is to make a single worker process using a sequencer to mark spawned threads. Numbers generated by the sequencer are then used to sort the results to achieve the correct order of the result list:
(define (worker)
  ; init sequencer
  (let loop ((n 0))
    (let ((msg (?)))
      ; terminate worker on request
      (if (not (eqv? 'exit (car msg)))
        (let ((pid (car msg)) (fun (cadr msg)) (arg (caddr msg)))
          ; start a new thread
          (spawn (lambda () (! pid (cons n (fun arg)))))
          ; increase sequencer
          (loop (+ n 1)))))))

(define (pmap fun args)
  ; start worker
  (let ((p (spawn worker)) (result '()))
    ; send data to the worker
    (for-each (lambda (x) (! p (list (self) fun x))) args)
    ; collect results
    (let loop ((i (length args)) (lst '()))
      (if (= 0 i)
        (set! result lst)
        (loop (- i 1) (cons (?) lst))))
    ; terminate worker
    (! p (list 'exit))
    ; sort results
    (let loop ((in (qsort result <)) (out '()))
      (if (null? in)
        out
        (loop (cdr in) (cons (cdar in) out))))))
You can use any function you wish to sort the results. I used a modified version of quicksort found at Rosetta Code:
(define (qsort l gt?)
  (let q ((l l))
    (if (null? l)
      l
      (let ((s (split-by (cdr l) (lambda (x) (gt? (car x) (caar l))))))
        (append (q (car s)) (list (car l)) (q (cdr s)))))))

(define (split-by l p)
  (let loop ((low (list)) (high (list)) (l l))
    (if (null? l)
     (cons low high)
     (if (p (car l))
       (loop low (cons (car l) high) (cdr l))
       (loop (cons (car l) low) high (cdr l))))))
Now when you run the test again, you get the result list in the same order as the argument list, no matter what the execution time of single processes was:
(pmap f (list 1 2 3 2 1))

(1 2 3 2 1)
You can download the pmap library here.

Monday, July 19, 2010

Using trampolines in C

In my previous post I presented my implementation of trampoline code in newLISP. Since this technique is also used in non-functional programming languages, I prepared an example in C.
Let's assume we have two mutually recursive functions f1 and f2:
#include <stdio.h>

void f1(int n);
void f2(int n);

void f1(int n) {
  printf("%d\n", n);
  if (n == 0)
    printf("Blastoff!\n");
  else
    f2(n);
}

void f2(int n) {
  f1(n-1);
}

int main() {
  f1(1000000);
}
When you compile this code without any optimizations provided by a compiler and try to run it you will most probably quickly get a segmentation fault (or similar error, depending on your platform). What happens here is the application running out of stack for storing function callbacks.

In Lisp you can solve this problem by rewriting functions to return continuations instead of values. In C you can simulate continuations by the following means:
1. Making functions to operate on pointers to variables instead of actual variables, which allows the application to preserve their values between function calls.
2. Returning a pointer to the next function to be called instead of calling it directly, which makes the use of application stack unnecessary.

So rewritten f1 and f2 can be presented in the following form:
#include <stdio.h>

typedef void *(*Fun)(int *n);

void *f1(int *n);
void *f2(int *n);

void *f1(int *n) {
  printf("%d\n", *n);
  if (*n == 0) {
    printf("Blastoff!\n");
    return NULL;
  } else {
    return f2;
  }
}

void *f2(int *n) {
  *n = *n - 1;
  return f1;
}

int main() {
  int n = 1000000;
  Fun f = f1;
  while (f != NULL) {
    f = (Fun)f(&n);
  }
  return n;
}
Now the main function iteratively goes through subsequent function calls until the halting condition (NULL) is met. At the end of iteration variable n holds 0.

As a side note, gcc is able to optimize the code of mutually recursive functions quite effectively. If you compile the first example with gcc -O2 the resulting code will not crash, because the compiler replaces all call instructions with jmp.

P.S. I would like to thank Johnny, my colleague and true programming expert, who inspired me to write this post. I was also inspired by Carlos Oliveira's post on trampolines.

Thursday, July 15, 2010

Advanced recursion in newLISP

In my recent post about newLISP I mentioned that it does not support tail call optimization. In fact, many Lisps don't. As Bill Six pointed out even the ANSI standard of Common Lisp standard does not mandate (unlike Scheme) tail-call elimination provided by the language implementation, although it seems that all well-known ANSI Common Lisp compilers do it anyway.

I was wondering if there is a way to circumvent this problem and the first solution I found was to use memoize macro described in an excellent online documentation for newLISP, Code Patterns in newLISP:
(define-macro (memoize mem-func func)
  (set (sym mem-func mem-func)
    (letex (f func  c mem-func)
      (lambda ()
        (or (context c (string (args)))
        (context c (string (args)) (apply f (args))))))))
You can apply this macro to any function with any number of arguments. The trick here is that each time a function is called its result is cached in memory for another call. It can speed up your application tremendously, which can be observed by comparing the execution time of these example Fibonacci functions:
(define (fibo n)
  (if (< n 2) 1
    (+ (fibo (- n 1)) (fibo (- n 2)))))

(memoize fibo-m
  (lambda (n)
    (if (< n 2) 1
      (+ (fibo-m (- n 1)) (fibo-m (- n 2))))))

(time (fibo 35))
(time (fibo-m 35))
On my laptop (fibo 35) took 12,98 seconds to execute, while (fibo-m) executed in 0,016 miliseconds.

Unfortunately the memoize macro cannot help you to handle mutual recursion. A classic example of such recursion looks as follows:
(define (f1 n)
  (println n)
    (if (= n 0)
      (println "Blastoff!")
      (f2 n)))

(define (f2 n)
  (f1 (- n 1)))
You can easily make newLISP run out of stack by running (f1 1000), not to mention bigger numbers. What happens if we define a "memoized" version of f1 and f2? Let's see:
(memoize f1
  (lambda (n)
    (println n)
    (if (= n 0)
      (println "Blastoff!")
      (f2 n))))

(memoize f2
  (lambda (n)
    (f1 (- n 1))))
Again, running (f1 1000) immediately exhausts newLISP's stack.

A solution to this problem is using a technique called trampolining. Bill Clementson on his blog not only explained in an excellent way the concept of using trampolines, but also provided a Common Lisp implementation, which became my inspiration to write a newLISP version:
(define (trampoline fun arg)
  (catch
    (while true
      (let ((run (apply fun arg)))
        (setf fun (first run) arg (rest run)))) 'result)
  result)
A trampoline iteratively executes code thunks returned by a function, and this way avoids blowing out application stack. However, in order to use trampoline, the function must return continuation (a pointer to the next step) instead of value. Below is a version of beforementioned functions modified to use the trampoline:
(define (f1 n)
  (println n)
  (if (= n 0)
    (throw "Blastoff!")
    (list f2 n)))

(define (f2 n)
  (list f1 (- n 1)))
Now you can test it with:
(trampoline f1 '(1000))
(trampoline f1 '(10000))
(trampoline f1 '(100000))
...
Have fun!