Monday, June 8, 2009

Streaming Data with Hadoop and SQS -- a work in progress

Hadoop is by design a batch oriented system. Take a bunch of data, run a series of maps and reduces with it across X machines, and come back when it's done. A Hadoop cluster has high throughput and high latency. In other words, it takes a while, but a lot of stuff gets done.

At work, I'm leading a team that is implementing a data processing pipeline. The primary use case that we need to address involves quickly handling changes in the entity data we care about -- people, places, and things that act or act upon other people, places and things. Specifically, this means:
  • detect that there has been a change
  • get the changed data into the system
  • apply a series of transforms to it for downline rendering and searching systems
This use case requires a streaming solution. Each piece of data needs to be transformed and pushed into production relatively quickly. Note that there is an acceptable latency. If, for instance, if Famous Actor X dies, our system needs to detect it and update the data within the hour. However, detecting/updating data within a day would be too slow to be useful to someone wanting to know what was up with Famous Actor X.

At first glance, Hadoop is not a good fit for this solution. It takes file based inputs and produces file based outputs, so any individual piece of data moving though the system is limited by the speed at which an entire input set can be run through a cluster.

However, Hadoop has the following features that make it ideal for distributing work.
  1. The MapReduce abstraction is a very powerful one that can be applied to many different kinds of data transformation problems.
  2. The primary Mapper And Reducer interfaces are simple enough to allow many different developers to ramp up on the system in minimal time.
  3. HDFS allows the developer to not worry about where they put job data.
  4. Cluster setup and maintenance, thanks to companies like Cloudera (who fixed the issues I was seeing with S3, thanks!), is taken care of.
  5. The logic around distributing the work is completely partitioned away from the business logic that actually comprises the work.
  6. Jobs that fail are re-attempted.
  7. I'm sure there's more..
As our system scales, we can only assume that the number of concurrent inputs and therefore the system load will grow. If we were to take the lowest initial effort route and write our own multithreaded scheduler, we would have a much more straightforward solution.
However, as the workload grows to swamp a single machine, we would eventually end up having to deal with the headaches of distributed computing -- protocol, redundancy, failover/retry, synchronization, etc.

In fact, I was fully intending to write a quick scheduler app to stream data and throw it away when we reached a scale that required distribution, at which point I was going to use Hadoop. However I soon realized that I would be solving problems -- scheduling, data access, retry logic -- and those were just the initial non distributed issues -- that were already addressed by Hadoop.

Still, there is the high latency/inherent batchiness of Hadoop. In order to work around the high latency problem, we're trying to enable streaming between MapReduces via SQS. The input of the entity data into the system and the various transforms of that data can be treated as a series of MapReduce clusters, where the transformation is done during the map, and any necessary collation is done during the reduce. The Reduce phase of each MapReduce can send a notification for the piece of data it has to the next MapReduce cluster.

Of course, it is really inefficient to run MapReduce over a single piece of data at a time. So the listener on each SQS queue buckets incoming messages using a max messages/max time threshold to aggregate data. When a threshold is reached, the system then writes the collected messages to a file that it then starts a MapReduce job on. This is mini batching, and as long as it delivers the data within the specified system latency, it's all good.

What we are doing is definitely a work in progress. The reason why is that there are several 'dials' in this process that need to be tuned and tweaked based on the size of the input data.
  1. number of messages/amount of time to wait. Of course, the larger the input files, the more 'efficient' the system. On the other hand, making those files larger may imply an increase in latency.
  2. number of concurrent jobs -- there is a sweet spot here -- it (again) doesn't make sense to launch 100 concurrent jobs. We will need to pay attention to how long a job takes before deciding to adjust the number of concurrent jobs up or down.
  3. number of transformations -- the bucketing required implies that every transform has a built in latency, that factors into the overall latency.
  4. cluster size -- it makes no sense to run 20 nodes where 2 would suffice, but there will be times when 20 is necessary
Some of the dials may be adjustable on the fly -- sending messages to a cluster via an 'admin' queue allows us to change message size/max time/concurrent job numbers dyamically. Other dials may require a stop-reconfig-start of a cluster. One benefit of using SQS is that no messages are lost while we tweak these 'hard stop' dials.

We're not doing a classical MapReduce where we transform data and then collate it under a distinct key. We're doing simple transformations with the data. The reduce piece isn't really needed, because there is no collation to be done. Plenty of people use MapReduce this way, primarily because it allows them to easily decompose their logic into parallel processing units.

We are even going further by undermining one of the key strengths of MapReduce, the ability to operate on large chunks of data, and instead running lots of smaller jobs concurrently. The Hadoop framework makes this possible. I'm not sure how optimal this is, and expect that we will be tweaking the message size and concurrency dials for some time. One of the advantages that the underlying Hadoop framework offers us is flexibility, as evidenced by the kinds of dials we can tweak to get optimal system throughput and latency.

I'll keep updating as we learn more about how the system behaves, and what the 'right' settings of the dials are based on the amount of data being input. I don't know if this is necessarily the most correct or elegant way to build a pipeline, but I do know that it is a good sign the Hadoop framework is this flexible.


  1. Hey Arun,

    Glad to know the S3 issues you were having are all fixed!

    -Todd from Cloudera

  2. This comment has been removed by a blog administrator.

  3. This comment has been removed by a blog administrator.

  4. This comment has been removed by a blog administrator.

  5. This comment has been removed by a blog administrator.

  6. This comment has been removed by a blog administrator.