Resource synchronization on Hadoop clusters with ZooKeeper - Part II

Straight from where i left. GPUs are massively paralel in contrast to CPUs, hence for some parallel processes, they are damn fast. The benchmarks you see around showing performance increases over 100x are theoretically true. By theoretical i mean pure CPU vs GPU computing power. In other words, for an infinitely running computation, it is possible to get 100x more results with a GPU than you would with a CPU core given a constant amount of time. But as experienced GPGPU developers would undoubtedly know, in practice, things rarely happen that way.

First of all, only a small part of commercially meaningful computations are running infinitely. The first infinitely running computation coming to my mind is calculating the digits of pi. That surely is to make some money if you are into cryptoghrapy or something but i guess it is safe for me to say that is both a niche and a dominated market. Another computation may be fractal generation and i have yet to meet anyone making money out of generating colorful images. Businesses sell results and to get results, processes must end in some way.

Two of the well known facts of finite processes are that they need some data beforehand, and they output some data afterwards. That means the duration of any process will roughly be of IO time and computation time. A GPU may decrease computation time but since you cannot change IO time, it will eat into your 100x expectations. Bare computing time may decrease but when IO time stays the same, (actually it increases in GPGPU processes but that’s another story) depending on the type of the problem, you may settle for 3x performance or less.

So, as shown in Part I, if you configure your systems to have your resources (GPUs) occupied by only one process at any given time, you do not use them optimally. Meaning, if you configure your MapReduce TaskTrackers’ maximum simultaneous map tasks count to the number of your resources on the system, JobTracker will wait for each task to finish before starting another one, and your resources will sit idle in IO part of these tasks.

One solution is using more than one process. It is possible to start two processes for each resource and let one use the resource while the other one does its IO operations. After first one is done with the resource, it can signal the other one to start operation. So resources will never have to wait for IO to be done beforehand.

This process signaling mechanism fits perfectly with zookeeper’s watches. You can set a watch on a znode and zookeeper will notify you when there is a change on it. In this particular problem, second process may set a watch on a common znode. When the first process is done with the resource, all it has to do is modify the znode to let the second process know it has finished. This is the exact explanation of what ResourceSynchronizer does. When you call .request() it will return the next free resource or if there aren’t any free resources, it will wait for another process to call .release() to return anything. So the process will be blocked before using the resource.

You can set the same pool/resource list and spawn some processes to see the effect. Say you set your resources as [“res0”, “res1”], and your resource intensive procedure takes 10 seconds. If you run 3 processes in 10 seconds, first 2 will get “res0” and “res1” respectively for their rs.request() calls, while the last process will wait till any of the first two processes call rs.release().

ResourceSynchronizer rs = new ResourceSynchronizer(
    new ZooKeeper("zkensemble", 20000, null), 
    "/pool", new String[]{"res0", "res1"});
log.warn("Requesting resource...");
log.warn("Got resource : " + rs.request() + ". Working...");
Thread.sleep(10000);
log.warn("Process Done! Releasing Resource");
rs.close();

Output of process #1

19:01:16,799 WARN  Run - Requesting resource...
19:01:16,824 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:16,846 INFO  ResourceSynchronizer - Resource '/pool/res0' allocated
19:01:16,846 WARN  Run - Got resource : res0. Working... 
19:01:26,846 WARN  Run - Process Done! Releasing Resource
19:01:26,853 INFO  ResourceSynchronizer - Resource released
19:01:26,861 INFO  ResourceSynchronizer - ZK connection closed

Output of process #2

19:01:22,504 WARN  Run - Requesting resource...
19:01:22,518 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:22,541 INFO  ResourceSynchronizer - Resource '/pool/res1' allocated
19:01:22,541 WARN  Run - Got resource : res1. Working... 
19:01:32,541 WARN  Run - Process Done! Releasing Resource
19:01:32,556 INFO  ResourceSynchronizer - Resource released
19:01:32,564 INFO  ResourceSynchronizer - ZK connection closed

Output of process #3

19:01:23,967 WARN  Run - Requesting resource...
19:01:23,992 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:24,003 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:26,854 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:26,871 INFO  ResourceSynchronizer - Resource '/pool/res0' allocated
19:01:26,871 WARN  Run - Got resource : res0. Working... 
19:01:36,871 WARN  Run - Process Done! Releasing Resource
19:01:36,886 INFO  ResourceSynchronizer - Resource released
19:01:36,894 INFO  ResourceSynchronizer - ZK connection closed

Output of process #4

19:01:26,802 WARN  tool.Run - Requesting resource...
19:01:26,823 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:26,835 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:26,854 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:26,879 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:32,556 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:32,573 INFO  ResourceSynchronizer - Resource '/pool/res1' allocated
19:01:32,573 WARN  Run - Got resource : res1. Working... 
19:01:42,573 WARN  Run - Process Done! Releasing Resource
19:01:42,581 INFO  ResourceSynchronizer - Resource released
19:01:42,590 INFO  ResourceSynchronizer - ZK connection closed

These 4 outputs belong to the same code piece running 4 times with a few seconds between them. If you check the timing on the logs 1st and 2nd processes got res0 and res1, just after they requested them. 3rd process took res0 just after 1st one released it. 4th process also tried for res0 after 1st one released it but couldn’t make it so waited for 2nd one to release. Timestamps show that the resources were left idle for only a few milliseconds.

I have also added a REUSE configurable in the code to let resources to be reused given amount of times. For the previous example if REUSE is set to 2, first 4 processes will get “res0”, “res1”, “res0”, “res1”. Fifth one will wait for a resource to be freed up. With small modifications i am sure it can be a solution to IO increasing properties of GPGPU processes too, but it will probably require you to change the way you access your application so i am not there yet.

UPDATE: A colleague advised me to use “A semaphore implementation using ZooKeeper” as the title, which would be appropriate but not entirely correct. As a careful reader might notice, the mechanism is not binary nor does it use a counter. Instead, it holds the names of the resources it is supposed to allocate.

Notes

  1. agaoglu posted this