GPUs, Hadoop and Testing Scalability

As i told numerous times before, i am currently trying to get some GPU powered image processing application to run on Hadoop. In development phase we were using a cluster of 12 machines with one Nvidia GTX 480s each, but since we are launching in a few months, we had to do some tests on our production cluster of 25 machines with two Nvidia Tesla M2050s each. In this post, i’ll try to sum up the process of testing, technical details will come later.

First some reminders about our architecture. Image processing application (IPA) receives an array of images and returns an array of results of doubles. A reduceless MapReduce application divides the images in HBase into chunks, and passes those chunks to IPA. Simply put, while it’s improbable for a single IPA to process thousands of images at once, whole system is able to process millions of images in parallel.

What matters (on our end) is the number of images IPA received and how much time did it take to return a resultset. Using those, we calculate a basic metric: speed in number of images processed per second (ipps). We also calculate the same speed for whole cluster, to see if we can reach a speed like nx ipps when our IPA runs at x ipps and cluster runs n IPAs in parallel (spoilers … we can!).

To show this in numbers, we measured base IPA speed on GTX 480. While the CPU on the system also effects it a bit, its runs at 19.46 ipps on average. On the other hand, our cluster with 12 GTX 480s runs at a total speed of 231 ipps which is extremely close to 12 x 19.46 = 233.52 ipps! Looking at this numbers we assumed our system scales linearly so when we increase the number of GPUs to, say 24 we’ll have 231 x 2 = 462 ipps.

With this assumption in mind, we measured base IPA on Tesla M2050, which is 14.80 ipps (yes, Tesla M2050 is about 24% slower than GTX 480) and expected to have a speed like 14.80 x 50 = 740 ipps on our production cluster with 50 Teslas. Our first results with 518 ipps was nowhere near that. We started investigating…

After some lousy ideas putting the blame on IPA folks and node configurations, we took a step back and started questioning our ways of testing. We knew there were IO and Hadoop task management overheads but they were omissible … for jobs containing large amounts of images. We missed that the definition of large would differ amongst clusters such as one with 12 GPUs and another with 50 (!). We were testing both using 100.000 images and it could’ve been a small number for the latter. We slowly increased the number of images to one million and…

we got close enough to expected speed of 740 ipps with 709 ipps. MapReduce jobs in our system will process millions of images in production which means the cluster will be fully utilized. If there were only a hundred thousand images a large portion of the investment would have been wasted.

Lesson learned in scalability: You have to cut your coat according to your cloth. Or you shouldn’t buy more cloth than what would be necessary to cut your coat. Or … Whatevs, you got the point.

Lesson learned in testing: Always test your systems, then test the hell out of them and when those don’t satisfy you change your tests and test again. It might cost some time but it will save money.

Java web services without (explicit) code generation - with exception handling

Finally… it’s been some busy weeks which i constantly sat in front of the computer in first one and constantly moved around them in the others. Finally i found some space to finish what i started. But space is not too much so i’ll keep this short.

Previously i talked about some funny web services stuff, and finished with a problem concerning exception handling. In short form: SOAP cannot transparently handle Java exceptions, so you cannot throw something in the server and expect the client to catch the same. You need some transformation.

In a longer form: SOAP has a thing called soapfault which is the closest thing you have to a java exception, but in order to use it you have to accept some rules. First of all your exception should be a checked one. Second, soapfault is basically XML so it can only transform things that can be parsed/rendered into XML. Which means you have to wrap your exception information in a JavaBean enabling it to be easily transformed into XML and back. Looks good with one little problem: what if you can’t get your exception information into a bean. Maybe you have errors in an enum or even worse, you are using an interface to describe your error codes. Well, JAX-WS has nothing to offer.

Another thing JAX-WS does not offer is a decent developer documentation. You have to waste some hours debugging to see which method gives you what. Except for typesafe exception handling, … because here goes:

MyException is the custom checked exception. For simplicity, it has two String parameters but it is possible for you to create your interface implementing custom object or enum or whatever you need. Just change the object instantiation in line 78 to suit you. Usage of this snippet will be simply like

MyService port = getPort(new QName(MY_NS, "MYServicePort"), MyService.class);
port = JaxWsExceptionCatcher.catchOn(port);

And you can catch and process your exceptions just as the service is in your classpath, still without code generation.

Java web services without (explicit) code generation

I don’t know you but i hate code generation. Bytecode generation may sometimes be useful, but kills debugging capabilities so should be avoided most of the time. Source-code generation on the other hand, i simply fail to understand the necessity. If some 3rd party library will write the codes i will run, why can’t i simply let the library do whatever it needs over some sort of an API?

Anyways, we all know the story. If you are making use of an external SOAP web service, you are kinda forced to generate (source) code. But most of us expand this approach and generate code for SOAP web services between modules of the same project. Which is extremely unnecessary, after JAX-WS 2.0 (i guess, not sure about the version). Instead, we can give plain-old-java-interface of our service and WSDL url to JAX-WS and make it work for us.

class MyService extends Service {
  public MyService() throws Exception {
    super(new URL("http://path/to/service?wsdl"),
        new QName("http://service.my.org/", "MyService"));
  }
  
  public My getMyPort() {
    return getPort(new QName("http://service.my.org/",
        "MyPort"), My.class);
  }
}

Above code shows the whats necessary on client side. Service we extend from is a class in JAX-WS framework. My is the interface of the service we are trying to use. This is the simplest example which you will find when you google JAX-WS without code generation. But as always noone’s trying to make a life with hello-world applications.

Every module uses custom beans (complex-types) in communication so a single interface will not be enough to work (It will be if there are no complex-types). JAX-WS will auto-generate transport classes but will not touch business specific beans. So what i come up with is to make the service providing module to publish a jar with necesseary beans and web service interface. Service consuming module defines a dependency to that artifact and goes along with its life. The jar actually contains the half of the stuff what JAX-WS would generate but now, its not ugly as in generated by some magic library, its ugly as some module developer wrote it ugly so you can push him/her around. Another upside is now that you have written the instantiator code (above) you can write it anyway you like and say dependency inject using guice.

Story does not end here though. Now that you have (almost) isolated yourself from SOAP-mechanics (using guice and all) you may want your service provider’s exceptions untouched. Hold tight for second post.

Using ivy and maven together

It’s not logical, highly unnecessary and probably expensive. But anyhow we found ourselves in that environment no matter what. Problem stemmed from the fact that eclipse/RCP dependency system being incompatible with virtually everything out there. We were using ant/ivy and pretty happy with it but our UI side found no easy way of headless-building their application using it. Eclipse is trying to make use of maven 3 with a thing called tycho, but that’s another story. Point is, they were practically forced to maven, and so was i (us).  

The problem is, the eclipse project(M1), which is built using maven, depends on a project(I1) which is built using ivy. Since these projects are constantly evolving, dependency is for SNAPSHOT version. Add another oversight of choosing nexus as artifact repository manager, we ended up being unable to publish SNAPSHOTs with ivy and depending on them with maven.

We set M1’s updatePolicy value to always and expected it to re-download the snapshot artifact of I1 on every change but there is more than one way to do this. Ivy relies on the timestamps of those while maven uses external metadata to identify if a SNAPSHOT artifact has been changed or not. But, ivy has no idea about an external metadata file during publish/deploy so nothing to use for maven. (1)

Nexus can actually repair missing metadata files but i think (not sure) it requires the artifacts to be deployed with uniqueVersions (that funny timestamp-like things replacing SNAPSHOT). Of course, ivy has no idea about those neither. (2)

OK, we can disable uniqueVersions and get “SNAPSHOT” without funny timestamps. But no, because maven 3 got rid of the functionality and uniqueVersion is always on. (3)

Adding (1), (2) and (3); we had a huge incompatibility problem on our hands. Some researching came back negative (maven blaming nexus, nexus blaming ivy, ivy asking questions why maven?…) we fell back to disabling ivy-publish‘es and using mvn deploy:deploy-file’s instead. Reconfigured our jenkins accordingly and finally evaded problems.

Bottom line: don’t use ivy and maven together; it’s not logical, highly unnecessary and probably expensive.

Apache ODE and CLOB issue

I took-over some responsibilities from a recently departed collegue, and with it, i was kinda forced to turn back to JEE world. Not exactly the same technologies and frameworks i am used to, but once you hate some part of something it is likely that you won’t enjoy the other parts.

Anyways, first assignment was to move some WS-BPEL processes from glassfish to Apache ODE. It sounds like it should be easy since WS-BPEL is a standardized and well-acknowledged specification but only an inexperienced and/or naive developer believes that. Standards are never that standard. Only the simplest hello-world can be deployed to more than one (two at most) container without a problem, your JPA application will never port from hibernate to toplink and your standards-compliant webpage will never look like the same in IE. Without some unknown hours/days of hard work, that is.

But for that instance, i got lucky. The hard part was already done and documented (1|2|3) by Hilal Tarakci (still not twitting!), whom i’ve been working closer now. The last problem was the easiest one but helped me steal all the credits. ODE, by default, works using a derby database which doesn’t like CLOBs larger than some size and barfs like this when it encounters one:

java.sql.SQLException: An unexpected exception was thrown
	...
Caused by: java.sql.SQLException: An unexpected exception was thrown
	...
Caused by: java.sql.SQLException: Java exception: 'A truncation error 
  was encountered trying to shrink CLOB '' to length 1048576.:
  org.apache.derby.iapi.services.io.DerbyIOException'.
	...
Caused by: org.apache.derby.iapi.services.io.DerbyIOException:
  A truncation error was encountered trying to shrink CLOB ''
  to length 1048576.

I guess this was somewhat expected because there is a small tutorial in the installation docs of ode, showing how to configure it work on a mysql db. Distribution package also contains DDLs for Oracle, but if you’re already running a postgresql server and don’t want another link in the chain, you’re (not) alone. Without further ado, here are the things you should do.

  1. Create the database you wish to use on the server (you wish to use).
  2. Get this SQL piece and execute on it.
  3. Take this context snippet and place it into your $TOMCAT_HOME/conf/server.xml in <Host> part after modifying as necessary.
  4. Get a jdbc jar from postgre and place it into $TOMCAT_HOME/lib
  5. Get this properties file and place it into $TOMCAT_HOME/webapps/ode/WEB-INF/conf
  6. Start tomcat.

And yes, i mostly got this from the original tutorial. Only thing i did was to edit SQL into the form that postgre would understand. For those of you running something bigger than tomcat, it should be easier to define a JDBC connection on JNDI.

Hadoop MapReduce job statistics (a fraction of them)

Well, this has been on my backlog for a while. The problem is extremely simple actually: when did a MapReduce job started processing? I need this info to report to my clients using my API, meaning redirecting them to the JobTracker’s web interface is not an option.

Everyone using hadoop for some time knows 0.20 is the version to use, and everyone developed something other than a WordCount knows it’s a PITA. API is hard to use at best, misleading and incomplete most of the time. You might wonder how hard can it get to extract a basic (and easily accessible over the web interface) piece of information such as a start time of a job, all i can say is very.

Without further ado, while i expect something like Job.instance("JOB_ID").getStartTime() here is the piece of crappy code i found to be working:

long startTime(String jobID) {
  Configuration conf = new Configuration();
  JobClient jobClient = new JobClient(new JobConf(conf)); // deprecation WARN
  JobID jobID = JobID.forName(jobID);                     // deprecation WARN
  RunningJob runningJob = jobClient.getJob(jobID);
  Field field = runningJob.getClass()
      .getDeclaredField("status"); // reflection !!!
  field.setAccessible(true);
  JobStatus jobStatus = (JobStatus) field.get(runningJob);
  return jobStatus.getStartTime(); // finally
}

As noted above, JobConf and JobID are deprecated. But since there is no way of working with anything non-deprecated, we reluctantly accept that. What we may not accept is working with reflection, but well… I couldn’t find any other way (please point me if you know). It is actually funny to have that information in the status field of runningJob but not able to access with because of a lack of getStartTime() method which reads from it. (BTW v0.21 is closer to what i expect but it is largely unusable for various reasons.)

On the other hand, my requirement wasn’t that, exactly. There may be a delay between the time i have submitted a job and it started processing, highly because the cluster was busy. What i needed was when the job started actually processing, meaning the time the first task is fired on a task tracker. Now i expect something like Job.instance("JOB_ID").getTasksOrderedByStartDate().get(0).getStartTime() but i know i won’t get what i expect, instead:

long actualStartTime(String jobID) {
  Configuration conf = new Configuration();
  JobClient jobClient = new JobClient(new JobConf(conf)); // deprecation WARN
  JobID jobID = JobID.forName("job_201107011451_0001");   // deprecation WARN
  RunningJob runningJob = jobClient.getJob(jobID);
  TaskID firstCompletedTaskID =                           // deprecation WARN
      runningJob.getTaskCompletionEvents(0)[1].getTaskAttemptId().getTaskID();
  for (TaskReport tr : jobClient.getMapTaskReports(jobID)) {
    if (tr.getTaskID().equals(firstCompletedTaskID)) {
      return tr.getStartTime(); // search !!!
    }
  }
}

First task completion event belongs to the SETUP task which runs on the time of job submitting no matter what the cluster is busy with. That’s because i’m getting the second one in the array using [1].

One small problem is that i’m using task completion events, not task starting events, so i am assuming the first task to get finished is also the first task to get started. This is usually correct in my case but i know it will not apply to others.

I haven’t been able to find a way to get a job’s finish date yet, i’m using job.end.notification.url for that. Hadoop sends a GET to a servlet on finished jobs so i simply get the time the service was called. It may not be accurate but again works for me.

In the light of these difficulties, i am thinking about a simple application that serves easily parseable job information. It would probably be rendered obsolete when 0.22 is out but it might still be useful to be able to consume such info with other languages than Java.

Scalatra result announcer w/ various datastores

In previous post, i talked about an examination result announcer application in scalatra/scalate/scalaquery. I mentioned i would try the app with different datastores in another day and post the results. That day has arrived at last.

Since i’ve talked about the system before I’ll keep things short this time and jump straight to results.

			RPS	p90
In memory:		591	238
Voldemort (0.90):	552	256
Mongo (1.8.2):		548	255
Redis (2.2.12):		523	265
Cassandra (0.8.2):	504	273
HBase (0.90.3):		471	285
MySQL (5.5.15):		453	346

In memory storage means i used a scala.collection.mutable.Map object in my Controller to collect the results. The result above was measured with scala 2.9 parallel collections. Without them, the numbers were slightly smaller.

All 3rd party storage solutions were working on localhost with their default configurations. All of them have been accessed with preferred or well-known drivers. 

I did not try to optimize my code as it is another one of my goals to measure how easy it is to get best performance with little effort.

As seen in the chart, voldemort and mongo are virtually the same in terms of these simple performance measurements. I guess this is because both the storage systems work directly off memory. Since i everything is configured to defaults, voldemort was using in-process BDB-Java, and mongo was using memory-mapped files (i guess). On the other hand, while being an in-memory system too, redis missed their performance with a small amount. I feel some sort of configuration requirement there.

Cassandra and HBase with their BigTable like storage mechanics lag behind others with small margins. I don’t know much about Cassandra but running a pseudo-distributed HDFS and an HBase (and on the same computer as the application and jmeter) is highly discouraged. And, i guess i know enough HBase to say that it is not the perfect fit for this example application. Results simply reflected that.

Since my linux has updated itself several times in last two months, i ran mysql tests in order to keep things fair. You might have noticed its performance is also increased, but not to the point where it may compete with others.

All the code is on github. Switch branches for different stores.

NOTE: Those system updates effected my couchdb too, which upgraded it to 1.1.0. Long story short, my application running on it outperformed everything on the list with 649 requests-per-second and 90% of requests were under 201ms.

NOTE: Another thing i discovered was if i were to use rewrites in couchdb, it damages performance to 550 RPS and 230ms p90. Interesting…

Announcing results with scalatra

As other posts in the series mentioned, i am trying out some web frameworks and data stores with a small web application which would be used to announce the results of a hypothetical exam. You can find the details in the first post. Today, the example app will be of scalatra.

Scalatra is a sinatra-like lightweight web framework written in scala. For developers living under a rock for a few years, sinatra-like means just enough tools to map a URL to a method. No ORM, no templating, no authentication over LDAP. Just URLs and methods. Scala is a programming language which tries to leverage both object-oriented and functional concepts. Google is full of sites telling reasons why scala is a great language.

It is clear that a programming language and a simple web framework is far from being enough to develop a web application nowadays. In order to query results, they have to be stored in some database first. For this specific example, i’ll use mysql. A simple web-application like this one would not actually require another layer on top of the database but since i am evaluating the ecosystem rather than developing a real-life application, i’ll add scala-query into the mix. I hope it will ease the pain JDBC will induce. On UI side, mixing HTML with application logic is pretty much accepted as a bad practice, so i’ll use scalate as the templating engine. As all of the tools are leaning towards sbt, it will be the bowl holding the soup together.

Since scalatra is the core piece in the environment, it seemed logical to start with it and add other ingredients as i go. There are two ways to create a scalatra project skeleton. First way is simply cloning this repo. Other one requires installation of giter8 and sbt seperately. Both will come scalate included. Difference is former uses sbt 0.7 series while latter uses sbt 0.10. If you’re like me and have a tendency to walk on the edge, you’ll need to “g8 scalatra/scalatra-sbt” after installing these shiny tools. It will ask some questions about the project and create it.

$ g8 scalatra/scalatra-sbt
organization [com.example]:
name [scalatra-sbt-prototype]: resultannounce
servlet_name [MyScalatraFilter]: ResultAnnouncer
scala_version [2.9.0-1]: 2.9.0
version [1.0]:

After that is finished cd into your project and run “sbt”. It will download some files and give you the sbt shell. Run an “update” to get your dependencies followed by a “jetty-run” to see some Hello application running on localhost:8080… Now get coding!

First thing i did was changing the default Hello screen to the login page, written in template main using jade. 

  get("/") {
    templateEngine.layout(root+"main.jade")
  }

After setting the form to simply POST to a url like /r/idnumber, it was easy to handle values in scalatra side.

  post("/r/:idn") {
    val result = Result of (params("idn"),params("pass"))
    result match {
      case Some(_) => templateEngine.layout(
        root+"result.jade", Map("result"->result))
      case _ => templateEngine.layout(
        root+"main.jade", Map("formErr"->"Wrong Details"))
    }
  }

The Result here is a DAO i have thrown with my limited scala and virtually non-existent scala-query knowledge.

object Result {
  val db = Database.forURL("jdbc:mysql:///ss?user=root", 
    driver="com.mysql.jdbc.Driver")
  def of(id:String, passwd:String) = {
    db withSession {
      val q = for (e <- Results if e.id === id) yield e
      q.first
    }
  }
}
object Results extends
    Table[(String, String, String, String)]("results") {
  def id = column[String]("id", O PrimaryKey)
  def passwd = column[String]("passwd", O NotNull)
  def name = column[String]("name", O NotNull)
  def result = column[String]("result", O NotNull)
  def * = id ~ passwd ~ name ~ result
}

There is a password-hash control step too but i omitted it for this post. The Results object defines the database table and enables us to query without writing any SQL.

After some manual testing to check everything is in working order, i loaded 3M results in that table and started hammering the application with jmeter. Simple stress tests on login screen which does not touch database or anything yielded 658 requests-per-second for 100 concurrent users and 90% of the requests were served within 180ms. Considering the test for couchdb on the same machine gave 500 requests-per-second with 290ms 90% line, scalatra and scalate seems like an improvement. But as i have mentioned before this is hardly a real-life scenario: Users generally won’t bounce off login screen, they will login using their details and try to see their exam results. Using a jmeter workbench for this scenario, requests-per-second dropped to 396 and 90% line increased to 413ms. Now that seems a like a little stepback from what i was able to achieve with couchdb. I’ll try to identify the pieces (scalate, scala-query, mysql) causing the slowdown but that’s for another day.

Scalability side is the same as the couch: application is totally stateless so running mirrors behind a load-balancer should be enough to increase that RPS to the point required. Since there are no writes there shouldn’t be any problem for scaling them.

The whole application took my whole day including the time needed to get to the default Hello screen. Besides mysql, all the tools here were new to me, yet i was able to make something that solves a problem. Although i am a newbie, scala is fun to work with and considering what i was able accomplish in a day, it is nowhere near complex. But couchdb still seems a better fit for this kind of problem. Results are documents and a document store with HTML rendering capabilities is all one can ask for.

UPDATE: Code is now on github

Resource synchronization on Hadoop clusters with ZooKeeper - Part II

Straight from where i left. GPUs are massively paralel in contrast to CPUs, hence for some parallel processes, they are damn fast. The benchmarks you see around showing performance increases over 100x are theoretically true. By theoretical i mean pure CPU vs GPU computing power. In other words, for an infinitely running computation, it is possible to get 100x more results with a GPU than you would with a CPU core given a constant amount of time. But as experienced GPGPU developers would undoubtedly know, in practice, things rarely happen that way.

First of all, only a small part of commercially meaningful computations are running infinitely. The first infinitely running computation coming to my mind is calculating the digits of pi. That surely is to make some money if you are into cryptoghrapy or something but i guess it is safe for me to say that is both a niche and a dominated market. Another computation may be fractal generation and i have yet to meet anyone making money out of generating colorful images. Businesses sell results and to get results, processes must end in some way.

Two of the well known facts of finite processes are that they need some data beforehand, and they output some data afterwards. That means the duration of any process will roughly be of IO time and computation time. A GPU may decrease computation time but since you cannot change IO time, it will eat into your 100x expectations. Bare computing time may decrease but when IO time stays the same, (actually it increases in GPGPU processes but that’s another story) depending on the type of the problem, you may settle for 3x performance or less.

So, as shown in Part I, if you configure your systems to have your resources (GPUs) occupied by only one process at any given time, you do not use them optimally. Meaning, if you configure your MapReduce TaskTrackers’ maximum simultaneous map tasks count to the number of your resources on the system, JobTracker will wait for each task to finish before starting another one, and your resources will sit idle in IO part of these tasks.

One solution is using more than one process. It is possible to start two processes for each resource and let one use the resource while the other one does its IO operations. After first one is done with the resource, it can signal the other one to start operation. So resources will never have to wait for IO to be done beforehand.

This process signaling mechanism fits perfectly with zookeeper’s watches. You can set a watch on a znode and zookeeper will notify you when there is a change on it. In this particular problem, second process may set a watch on a common znode. When the first process is done with the resource, all it has to do is modify the znode to let the second process know it has finished. This is the exact explanation of what ResourceSynchronizer does. When you call .request() it will return the next free resource or if there aren’t any free resources, it will wait for another process to call .release() to return anything. So the process will be blocked before using the resource.

You can set the same pool/resource list and spawn some processes to see the effect. Say you set your resources as [“res0”, “res1”], and your resource intensive procedure takes 10 seconds. If you run 3 processes in 10 seconds, first 2 will get “res0” and “res1” respectively for their rs.request() calls, while the last process will wait till any of the first two processes call rs.release().

ResourceSynchronizer rs = new ResourceSynchronizer(
    new ZooKeeper("zkensemble", 20000, null), 
    "/pool", new String[]{"res0", "res1"});
log.warn("Requesting resource...");
log.warn("Got resource : " + rs.request() + ". Working...");
Thread.sleep(10000);
log.warn("Process Done! Releasing Resource");
rs.close();

Output of process #1

19:01:16,799 WARN  Run - Requesting resource...
19:01:16,824 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:16,846 INFO  ResourceSynchronizer - Resource '/pool/res0' allocated
19:01:16,846 WARN  Run - Got resource : res0. Working... 
19:01:26,846 WARN  Run - Process Done! Releasing Resource
19:01:26,853 INFO  ResourceSynchronizer - Resource released
19:01:26,861 INFO  ResourceSynchronizer - ZK connection closed

Output of process #2

19:01:22,504 WARN  Run - Requesting resource...
19:01:22,518 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:22,541 INFO  ResourceSynchronizer - Resource '/pool/res1' allocated
19:01:22,541 WARN  Run - Got resource : res1. Working... 
19:01:32,541 WARN  Run - Process Done! Releasing Resource
19:01:32,556 INFO  ResourceSynchronizer - Resource released
19:01:32,564 INFO  ResourceSynchronizer - ZK connection closed

Output of process #3

19:01:23,967 WARN  Run - Requesting resource...
19:01:23,992 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:24,003 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:26,854 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:26,871 INFO  ResourceSynchronizer - Resource '/pool/res0' allocated
19:01:26,871 WARN  Run - Got resource : res0. Working... 
19:01:36,871 WARN  Run - Process Done! Releasing Resource
19:01:36,886 INFO  ResourceSynchronizer - Resource released
19:01:36,894 INFO  ResourceSynchronizer - ZK connection closed

Output of process #4

19:01:26,802 WARN  tool.Run - Requesting resource...
19:01:26,823 INFO  ResourceSynchronizer - ZK ensemble connected
19:01:26,835 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:26,854 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:26,879 INFO  ResourceSynchronizer - No available resource, waiting... 
19:01:32,556 INFO  ResourceSynchronizer - Retrying to get another resource
19:01:32,573 INFO  ResourceSynchronizer - Resource '/pool/res1' allocated
19:01:32,573 WARN  Run - Got resource : res1. Working... 
19:01:42,573 WARN  Run - Process Done! Releasing Resource
19:01:42,581 INFO  ResourceSynchronizer - Resource released
19:01:42,590 INFO  ResourceSynchronizer - ZK connection closed

These 4 outputs belong to the same code piece running 4 times with a few seconds between them. If you check the timing on the logs 1st and 2nd processes got res0 and res1, just after they requested them. 3rd process took res0 just after 1st one released it. 4th process also tried for res0 after 1st one released it but couldn’t make it so waited for 2nd one to release. Timestamps show that the resources were left idle for only a few milliseconds.

I have also added a REUSE configurable in the code to let resources to be reused given amount of times. For the previous example if REUSE is set to 2, first 4 processes will get “res0”, “res1”, “res0”, “res1”. Fifth one will wait for a resource to be freed up. With small modifications i am sure it can be a solution to IO increasing properties of GPGPU processes too, but it will probably require you to change the way you access your application so i am not there yet.

UPDATE: A colleague advised me to use “A semaphore implementation using ZooKeeper” as the title, which would be appropriate but not entirely correct. As a careful reader might notice, the mechanism is not binary nor does it use a counter. Instead, it holds the names of the resources it is supposed to allocate.

Resource synchronization on Hadoop clusters with ZooKeeper - Part I

“We need zookeeper to run HBase”. Until past week that was basically my view of zookeeper. It is a distributed configuration and coordination service, HBase requires it so we have to put it in cluster. For a size of our cluster 1 instance seems to be OK but we are running 3 instances. These four sentences pretty much summed up what i knew about it. Fortunately i had looked to its main page previously and remember its somewhat abstract purpose : “Distributed coordination”.

First, some clarification: This post is generally about coordinating processes that requires access to some limited system resource. In order to keep things simple, i used an example resource throughout the post, which is GPU. Another example may be distributed CD publishing using Hadoop on machines with a number of CD writers. Or, controlling an array of arduino devices to simulate LHC. In short, post has no direct relationship with GPGPU programming nor GPU kernel thread synchronization. If you have arrived here googling that, i’m sorry.

So, the problem: is if you put a number of CPU cores in a single computer and start running processes, operating-system will place them to the cores accordingly. Say you have a 4-core machine and try to multiply 4 matrices simultenously, each multiplication will be done on another core. Well, i don’t know if there are any developments about it but that’s not the case for multi GPU systems. Say you have 4 GPUs on a machine and you spawned 4 processes wishing to multiply your matrices in each one of them, you need to explicitly tell those processes not to overlap with one another. If you leave it to OS, one or more of your GPUs may sit idle while others starve for resources. I heard Mac OS can manage this but they are not suitable to our environment.

In theory: there is no way of letting anything other than yourself decide which process (read: map task in a MapReduce job) should occupy which resource. Simplest solution is to supply the process the resource identifier it should operate on. Process may be executed with appropriate parameters. But this would mean manual control of all the processes which is not possible with MapReduce. Another solution is having processes ask to some daemon process, which resource to allocate. Daemon process may hold which process uses which resource, so answer other client processes accordingly. This is actually how we were working for a while now.

In practice: this daemon process would bring some maintenance issues as any other software components. It is just another service one needs to deploy to machines in the cluster and ensure it works properly. Because of this, we were hesitant to go production with this setup, looking for another solution. I am not sure how it happened but zookeeper seemed like it can do such a thing. Let me rephrase that, we thought we can do GPGPU process synchronization with zookeeper, without actually knowing what zookeeper does. It is a distributed coordination service right, how hard could it be?

After reading the “Getting Started” part of zookeeper documentation, i saw that my hand was blackjack. I can create some data points (called znodes), load some small data in them, and access those from any other process. An altogether solution to what keeps us scratching our heads. All i needed to do is, modify my mappers a little to talk with zookeeper before and after the process. Since GPUs should be coordinated per computer basis, mappers should know which computer they run on, and the GPUs on that node. I applied occam’s razor and got this.

  private String hostname() throws IOException {
    return InetAddress.getLocalHost().getHostName();
  }

  private static String[] discoverGpus() {
    File[] gpus = new File("/dev").listFiles(new FilenameFilter() {
      public boolean accept(File dir, String name) {
        return name.startsWith("nvidia") && ! name.endsWith("ctl");
      }
    });
    String[] ret = new String[gpus.length];
    for (int i = 0; i < gpus.length; i++) {
      ret[i] = gpus[i].getName();
    }
    return ret;
  }

On our nodes, GPUs are added as devices under /dev with sequential names nvidia0, nvidia1, … And there is one another device named nvidiactl which is not a GPU. Other additions to my mappers are those.

  // in setup()
  rs = new ResourceSynchronizer(
      new ZooKeeper(QUORUM_ADDRESS, TIMEOUT, null), 
      "/gpusync/"+hostname(), 
      discoverGpus());

  // in map()
  String gpu = rs.request();
  process(); // whatever
  rs.release();

  // in cleanup()
  rs.close();

Now i cheated a little bit here and didn’t include the core piece that is ResourceSynchronizer. That’s because in addition to holding GPU names and supplying names to process requiring them, it does one additional and somewhat more sophisticated task concerning GPGPU operations. Seasoned GPU developers may guess what it is but i left it for Part II of this posting.