Sunday, November 22, 2009

MagLev public alpha

Gemstone has just announced an alpha version of their concurrent Ruby engine, MagLev, available for download. MagLev is based on Gemstone's Smalltalk virtual machine and supports 64-bit Linux, Mac OS X and Solaris x86 operating systems. There are no plans for 32-bit version of MagLev.
MagLev does not support Rails yet, but so does not Fabio Kung's JMagLev. However, the advantage of MagLev over Fabio's machine is that Gemstone is determined to create an enterprise-class product, and JMagLev was just a demonstration of the power of Terracotta and does not seem to be developed any further. It seems that the next step for Gemstone will be to implement Rails functionality and allow RoR applications to run in a clustered enviornment, just as Grails ones can run on Terracotta.

Friday, November 20, 2009

Erlang project crawler

Today I received an email from Erlang Training and Consulting Ltd. - the owner of popular Erlang Community Site Trapexit - announcing its own Erlang open source project crawler. Crawler gathers information on open source Erlang projects from a number of code repositories such as GitHub, Bitbucket, SourceForge and Google Code. At the time when I am writing this post it includes information on 1228 projects. The number may not be impressive, but it is good to have information about the most interesting open source Erlang projects gathered in one place.

Sunday, September 6, 2009

Top gear(man)

Gearman is an open source project providing a flexible and universal framework for writing distributed applications. It differs from similar projects in easiness of use and the number of bindings for programming languages it provides: C, C++, Java, Perl, PHP and Python. In fact, Gearman has a simple command line client, that allows you to start jobs using any language you want - all you need to do is to provide the client with input data and then fetch the client's output. Gearman API is very simple, consistent, and makes writing distributed applications really easy, quick and fun.

Gearman architecture is equally simple: it consists of job servers, that accept task requests from clients and forward them to workers, and send results back to clients. Each worker can be connected to many job servers, and a client can choose which job server to use - this way there is no single point of failure that could break down the whole cluster. Job servers have their own queues and in case of worker failure they can reassign tasks to other workers. According to High Scalability Gearman has been successfully used by LiveJournal, Yahoo!, and Digg (which claims to run 300000 jobs a day through Gearman without any issues).

I decided to try out Gearman at home, and I must say that it was a really pleasant experience. I wrote a simple C++ worker and even simpler Python client. The worker recursively finds Fibonacci number for given n:

#include <cstring>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <libgearman/gearman.h>
#include <libgearman/worker.h>

using namespace std;

void *fib_worker(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr);
long fib(long n);
static void usage(char *name);

int main(int argc, char *argv[])
{
int c;
char *host = "127.0.0.1";
in_port_t port = 0;
gearman_worker_st worker;

while ((c = getopt(argc, argv, "h:p:")) != -1) {
switch(c) {
case 'h':
host = optarg;
break;
case 'p':
port = (in_port_t) atoi(optarg);
break;
default:
usage(argv[0]);
exit(1);
}
}

if (argc != optind) {
usage(argv[0]);
exit(1);
}

gearman_worker_create(&worker);
gearman_worker_add_server(&worker, host, port);
gearman_worker_add_function(&worker, "fib", 10, fib_worker, NULL);
while (1) {
gearman_worker_work(&worker);
}

return 0;
}

void *fib_worker(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr) {
char ch[256];
ostringstream os;
int size = gearman_job_workload_size(job);
strncpy(ch, (char *) gearman_job_workload(job), size);
ch[size] = 0;
long n = atol(ch);
os << fib(n);
string s = os.str();
*result_size = s.size();
*ret_ptr = GEARMAN_SUCCESS;
return strdup(s.c_str());
}

long fib(long n) {
if (n < 2) {
return 1;
} else {
return fib(n - 2) + fib(n - 1);
}
}

static void usage(char *name) {
cout << "Usage: " << name << " [-h <host>] [-p <port>] <string>" << endl;
cout << "\t-h <host> - job server host" << endl;
cout << "\t-p <port> - job server port" << endl;
}

Python client prepares a set of jobs for a sequence of n numbers, runs them simultaneously through a job server and sums up the result:

import optparse
from gearman import *

parser = optparse.OptionParser()
parser.add_option('--host', help = "Specifies gearman job server")
parser.add_option('-n', '--num', help = "Amount of Fibonacci numbers to compute")
(opts, args) = parser.parse_args()

client = GearmanClient([opts.host])

ts = Taskset()
for i in range(1, int(opts.num)):
t = Task(func = "fib", arg = i)
ts.add(t)

client.do_taskset(ts)

sum = 0
for task in ts.values():
sum += int(task.result)

print sum

You can download the source code of both worker and client here. After you compile and install Gearman with traditional:

./configure
make
sudo make install
sudo ldconfig

Install Python extension with:
easy_install gearman

And compile the C++ example with:
make

Then you can run a job server as a daemon:
gearmand -d -L 127.0.0.1

or in debug mode:
gearmand -vv -L 127.0.0.1

Next, run a couple of Gearman workers:
./GearmanWorker -h 127.0.0.1

And the Python client:
python GearmanClient.py --host 127.0.0.1 -n 45

For a single machine, it makes sense to run at most as many workers as there are CPUs (or CPU cores) available. For a network cluster, you can run more job servers and workers (and clients) respectively.

I've made some tests with the client and worker above. using my home laptop and an Intel Atom based net-top running together in a local network. For only one laptop worker, computing the sum of 45 Fibonacci numbers took 66.955 seconds, for two laptop workers it took 35.702 seconds, and adding a remote worker reduced the total time to 25.593 seconds. Adding more workers didn't reduce computation time, it even slightly slowed the cluster down - which is quite understandable, as the number of workers exceeded the number of free CPUs (Intel Atom in fact has only one physical core, although applications see it as dual-core CPU).

Sunday, July 5, 2009

Jabase: Jabber cluster on HBase

Java has often been compared with Erlang by Erlang advocates, who emphasize its advantage over Java in thread creating and message passing. Some even claim that Erlang can be Java successor in concurrent programming. Of course such comparisons and benchmarks have some value, but the truth is that Erlang has never been, and never will be, Java competitor. The reason for this is simple, and was perfectly explained by Dennis Byrne in his article "Integrating Java and Erlang":
Java and Erlang are not mutually exclusive, they complement each other. I personally have learned to embrace both because very few complex business problems can be modeled exclusively from an object oriented or functional paradigm.

Integration and interoperability are now the key words that make modern IT business go round. It is also understood very well by the Erlang team, who created Jinterface - a set of Java classes for communicating with Erlang.

Jinterface is also the key element that allowed me to build a highly scalable Jabber cluster based on ejabberd as XMPP server and HBase distributed database as a storage. Ejabberd is a distributed Jabber server written in Erlang, but unfortunately Erlang native storage, Mnesia, can't handle large amount of data. To overcome this limitation, ejabberd provides ODBC drivers for MySQL, MS SQL and PostgreSQL, but it's only a partial solution to the scalability problem. First, the whole ejabberd cluster still uses a single database instance as data storage, and second, user sessions are still kept in Mnesia.

Jabase is a middleware set of components written in Erlang and Java providing communication layer between ejabberd XMPP server and HBase distributed database. While ejabberd ensures communication between users and server instances, HBase provides highly scalable, distributed database to store large amount of data and serve them efficiently. Additionally, Java instances are responsible for caching user sessions and providing efficient methods of serving and searching for session data, while Erlang code ensures session data integrity among Jabber server instances. Jabase architecture looks like this:

The source code of Jabase has been released under GPL and the project website contains a manual how to compile and set up a simple Jabber cluster based on Jabase. However, technical support is served exclusively by Division-by-Zero, for which I built this software. If you have any questions or are interested in using Jabase in your company, please contact Division-by-Zero.

Wednesday, March 4, 2009

Incoming Revolution: Clojure + Terracotta

For some time I have been working quite extensively with Java and Java-related technologies in addition to all that Erlang and functional stuff I do every day, and I must say that I am really impressed with what is going on in the area where both worlds overlap. A few months ago I was experimenting with JScheme running on Terracotta, but as I told Ari from Terracotta Inc., who became interested in the project, much more interesting would be combining their product with Clojure. I knew that some people had already been thinking about it.
Not much time has passed since then, and guess what. Paul Stadig announced on his blog that he managed to run Clojure code on Terracotta. Today the same guy just made me looking for my jaw on the floor: he made the whole Clojure environment (together with REPL) work on Terracotta! Now imagine Clojure concurrent applications, using Software Transactional Memory distributed across computer network through Terracotta: you can build massive software clusters that can work with incredible performance; you can add Hadoop (distributed file system) and Hbase (distributed database) and be able to build a system that can handle hundreds of thousands of parallel operations and store petabytes of data; you can scale your system up and and down just by adding or removing machines from the cluster. And with modern cloud computing services, like AWS, you can build a large computation cluster or a social networking website with a relatively small budget.
Basically, you don't need much money to start another Facebook. Your imagination and programming skills are your only limit. Good luck!

Thursday, November 13, 2008

JMagLev: Terracotta based MagLev for Ruby

In one of my previous posts I wrote about clustering solutions for Ruby. One of them was MagLev, based on Gemstone's Smalltalk virtual machine, the second one was based on Terracotta. The latter solution seems more appealing, since Terracotta is open source, and lets you do some own experiments (like this, or this, or even this). Unfortunately, all solutions I have seen so far (including my own) for JVM based languages other than Java depended on a special library that interfaced Terracotta through com.tc.object.bytecode.ManagerUtil class instances, and were not transparent to the programmer. Until now.
Recently, Fabio Kung took a step further and patched JRuby to behave like MagLev, transparently sharing objects across multiple JRuby runtimes. It seems that MagLev got a strong competitor before it even managed to hit the market. Have a look at the demo, it looks equally impressive to what Avi Bryant from Gemstone showed at RailsConf 2008.

Thursday, November 6, 2008

Distributed processing with JScheme and Terracotta

In my post about clustering JScheme with Terracotta I presented how to share Java objects through Terracotta using JScheme. But clustering is not only about sharing data. It is also about parallel (and possibly distributed) task processing.

One of the fundamental assumptions of Scheme (and other Lisps) is that code is data. Programs are just collections of evaluable expressions and thus you can represent data as code, and vice versa. This is exactly what you need when you want to distribute code over a system designed primarily to share data.

I prepared a simple system based on JScheme that allows distributed, concurrent code processing over a Terracotta cluster. It uses a concept of independent nodes (borrowed from Erlang). Each node polls a shared list to find a new expression to evaluate. Once it finds a job to be done, it evaluates the expression using a JScheme evaluator (instance of jscheme.JScheme class) and writes a result back on the list. The client process which initiated the task, reads back the result and returns it.
Since all nodes are independent entities, you can start as many of them as you need and use them concurrently. But in most cases the optimal number of nodes is equivalent to the number of CPUs (or CPU cores) to be used on each machine connected to Terracotta. So if you have 2 computers with a quad core CPU and want to use only half of their power, you can start 2 nodes on each of them. If you want to use them to the full, you should use 8 nodes, 4 per each machine, and so on. You can start the nodes on single machine using a single or multiple JScheme shells, it's up to you. For me, a single Scheme REPL seems to be the most convenient option.
Client jobs are started through tc-map. It's a function that is similar to the standard map, but it takes an additional argument - a list of nodes to use for the job. Unfortunately, the system is not fault tolerant, so if one of the nodes dies during doing the job, the whole processing task hangs up. The only way out then is either to restart the dead node or evaluate the whole tc-map expression again.

OK, enough for the theory, let's do some real work. First you need to download the library. The online folder contains the library itself (jstc.scm), sample Terracotta configuration (tc-config.xml) and some tests. After you get the library and the configuration file, you should start the Terracotta server (I described the whole procedure in detail previously). If you run the Terracotta server on a remote machine, you should also edit the tc-config.xml file and change server host to the IP of the Terracotta host. Now you can start JScheme through the Terracotta bootstrap:

java -Xbootclasspath/p:[terracotta boot jar] -Dtc.config=tc-config.xml -Dtc.install-root=[terracotta install dir] -jar jscheme.jar

You can to find the boot jar in lib/dso-boot folder of your Terracotta installation directory. If it isn't there, you can generate it with Terracotta make-boot-jar.sh script.
Now you can load the library with:

(load "jstc.scm")

and start playing with it. For starters, let's run two nodes:

(start-node "test1")
(start-node "test2")

and define a helper function to generate a list of integers from a specified range:

(define (range min max) (let loop ((x max) (l '())) (if (< x min) l (loop (- x 1) (cons x l)))))

Now we need to "teach" running evaluators the Fibonacci function:

(tc-load (list "test1" "test2") "(define (fib n) (cond ((= n 0) 0) ((= n 1) 1) (else (+ (fib (- n 1)) (fib (- n 2))))))" )

and we are ready to spread a test job across the nodes:

(tc-map (list "test1" "test2") "fib" (range 1 20))

After a few seconds you should receive the following list:

(1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765)

You can use time function to compare computation times with results received by using a single node or a regular, sequential map:

(time (tc-map (list "test1" "test2") "fib" (range 1 20)))
(time (tc-map (list "test1") "fib" (range 1 20)))
(define (fib n) (cond ((= n 0) 0) ((= n 1) 1) (else (+ (fib (- n 1)) (fib (- n 2))))))
(time (map fib (range 1 20)))

Take note that a function passed to map is a regular Scheme expression, while with tc-map it must be a string.

It is quite possible that you get no gain over sequential processing using less than 3 nodes running on 3 cores with this library. The main reason is that Terracotta introduces some overhead itself. The second one is that nodes poll Terracotta for job lists in 20ms intervals. Those intervals are necessary if you don't want to consume the whole CPU power just for loops and leave none for jobs. You can adjust them by changing the value of JSTC_EVAL_DELAY.

I did some tests and I must say that the results surprised me. On my home laptop (Core2 Duo T5450 1.66 Ghz, 2 cores, 2GB RAM) the results looked like this:

(time (map fib (range 1 25))) - 17234 msec
(time (tc-map (list "test1") "fib" (range 1 25))) - 29020 msec
(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 19620 msec

while on three servers (dual Xeon E5430 2.66 Ghz, 8 cores, 8GB RAM each):

(time (map fib (range 1 25))) - 12687 msec
(time (tc-map (list "test1") "fib" (range 1 25))) - 22502 msec

but:

(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 25256 msec
(time (tc-map (list "test1" "test2" "test3") "fib" (range 1 20))) - 22355 msec

when I ran the tests on a single machine, and:

(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 14216 msec
(time (tc-map (list "test1" "test2" "test3") "fib" (range 1 20))) - 11538 msec

when each node was on a different machine.
On my laptop I got almost 150% speedup by using two Terracotta nodes instead of one, but on a server machine two nodes actually slowed the tasks down. I could get faster job processing only by spreading nodes across different machines. Adding new nodes to the machines seemed to have no impact on the results, so I couldn't get past 250% speedup factor.
Weird, isn't it?