Tuesday, June 23, 2009

Streaming Data with a Worker/Agent based approach

Where I was going....
In my last post I described how at work, we were investigating using Hadoop in a non batch setting. I mentioned that despite not using Hadoop's ability to collate keyed data from large data sets, we were still investigating Hadoop because of the built in robustness of the system:
  • Nodes are checked via 'heartbeat'
  • task status is centrally tracked
  • failed tasks are retried.
  • Work is pulled from the central JobTracker by TaskTrackers.
The basic pain points of maintaining highly available and robust functionality across a cluster of machines is taken care of, and was the primary motivator for us to try and stream data across a batch driven system.

However as we moved into implementation it became fairly obvious that we were pounding a square peg into a round hole. A lot has been written about how Hadoop and HDFS doesn't work particularly well with small files -- the recommended solutions usually involve concatenating those files into something bigger to reduce the number of seeks per map job. While these problems were understandable in a system optimized to process huge amounts of data in batch, waiting to batch up large files wasn't an option given the low latency requirement of our end users.

Especially disconcerting was the amount of work (and code) spent bundling queued work items into small files, and submitting those files as individual jobs. The standard worker model --having multiple processes with multiple threads per process running on multiple machines access SQS and process the data -- seemed so much simpler than creating artificial batches.

A Swift Change of Direction
The rewrite took a matter of hours, dropped out a lot of code, and was a minor change to the overall architecture, which uses SQS to transition between workflow states, and S3 to persist the results of data transformations. The move away from Hadoop was limited to intermediate worker processes -- we still use Hadoop to get the data into the system, because we are collating data across a set of keys when importing data. The latency went from somewhat indeterminate across mini batches to being the average time to process per thread. And the workers were easily subclassed from the Callable class -- developers could implement new workers by overriding a single method that took a string as input. When latency of the system went up, simply adding more machines running more processes would take care of the problem.

Distributed Availability and Retry Logic
Of course, that simplicity came with a price tag -- we lost the distributed bookkeeping that Hadoop provided. Specifically, we would have to implement:
  1. thread and process failure detection
  2. machine failure detection
  3. retry logic
All of which is non trivial to implement. However, our need to stream instead of batch data meant that we would have ended up having to do the retry logic differently than Hadoop anyways. We need to catch and retry data failures at a work item level, not at an arbitrarily determined file split level.

Our retry logic is pretty simple, and uses S3 to persist workflow state per work item. We traverse a list of items in the queue, determine which ones have 'stalled out', and submit them to the appropriate queue as part of a retry. At the same time we clean up work items that have been fully processed, and get average processing time per workflow process. These three things are best done in an asynchronous manner, as -- you guessed it -- Hadoop jobs. They need to take advantage of Hadoop's collation functionality.

Our thread failure logic is also pretty simple. Because I'm starting up Callable tasks and making them run until I shut them down, I can check to see if any of them have finished prematurely by calling isDone() on the Futures returned when submitting them to the ExecutorService.

Process failure can be monitored (and logged) by a watchdog program. Repeated process failure in this case is symptomatic of an uncaught exception being thrown in one of the process threads.

Machine failure is also easily monitorable. I need to expose a simple service on each machine to detect process and thread failures, and if that process is not reachable, I can assume that the machine is offline.

These may be fairly limited and crude methods of getting a highly available system in place, but they feel like the right primitives to implement because while I don't know why the system is going to fail, each of these methods gives me a way to know how it is failing.

The Conclusion (so far)
The morals of the story at this point are:
  1. frameworks can be extremely powerful if used for their strengths, and extremely limiting if used for their secondary benefits. When it feels like I'm pounding a square peg into a round hole, I probably am. I think this is called 'design smell', and now that I know what it smells like, I'll start backing up a lot sooner in an effort to find the right tool for the job.
  2. It is always a good sign when a refactoring drops out lots of code.
  3. Having to implement the availability and robustness of the system we are writing has actually made it easier to understand. Even though we are implementing functionality that we once got for free, at least we understand the limitations of the availability and robustness solutions we put in place.


  1. This comment has been removed by a blog administrator.

  2. This comment has been removed by a blog administrator.

  3. This comment has been removed by a blog administrator.