Tuesday, June 30, 2009

Running Zookeeper on the Mac with Soylatte and Eclipse 3.4

I've been using Zookeeper to store a sequence number that a large number of processes can access and increment in a coordinated manner.

Zookeeper has a nice, simple interface, and exposes a set of primitives that easily allow me to implement guaranteed synchronized access to my magic sequence number. I'll post more later on the specific solution, but right now I want to detail some of the issues I've run into and the workarounds I've put in place.

I run on a Mac (OSX/Leopard), use Eclipse 3.4 for my Java development, and use soylatte for my JDK. I think a lot of other people run with this setup. I'm using Zookeeper 3.1.3.

My initial setup steps:
  1. I downloaded Zookeeper, untarred it, and installed in /usr/local.
  2. I created a symlink from zookeeper-3.1.1 to zookeeper
  3. From that dir I ran sudo ./bin/zkServer start.

I immediately ran into a strange issue: I could connect to the zookeeper instance:

ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",ZookeeperBase.DEFAULT_TIMEOUT,this);

but could not create a node on it:

zk.create("/HELLO", foo, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

I kept getting timeouts. I've written my code to be 'timeout proof' because connection loss errors are to be expected under load in distributed environments, but I do kick out after 5 retries. Besides, I wouldn't expect to get the ConnectionLoss error when I am connecting to a localhost instance.

It turns out that the there have been soylatte nio issues with Zookeeper. I talked to Satish (he's on the email thread in the link, and we both work at Evri), and he said he had success using the latest version of 1.6 that mac 'officially' supports.

I switched to the latest Apple supported java 1.6 version: when I pointed my java binaries at 1.6, Zookeeper worked great, but Eclipse couldn't restart -- some more online research showed that this was another known issue.

So in the end: I
(1) created a java16 symlink to/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java
(2) used that symlink in zkServer.sh
(3) kept my $JAVA_HOME pointing to 1.5 by symlinking/System/Library/Frameworks/JavaVM.framework/Versions/1.5 to/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK

Saturday, June 27, 2009

Seattle Rock n Roll Marathon Race Report

Well, the big event has come and gone. And I'm happy to have crossed the finish line under my own power! It was a great first marathon experience. To summarize, after not being able to run very much in the last six weeks due to 'life happening' and some late breaking bronchitis, I decided to run the marathon with no particular time goal in mind. However, if I could come close to my original goal of sub 4 hours, that would be gravy :)

I ended up getting 4:10 (according to my watch, which I stopped during a short but necessary bathroom break). The race was going according to sub 4 pace and plan until mile 21, when massive calf cramps in both legs dramatically slowed me down. Cramping of any kind is frustrating, because you have to basically shut it down completely. I wasn't even breathing hard, but I simply could not move any faster. I was able to fight the cramps off for a while, but by mile 25 they were pretty constant, and spreading from my calves to my groin and quads. In the last half mile I was cramping with every step, but I was basically in a tunnel of people at that point, and they cheered me on through the cramps. While it wasn't really 'fun' at the time, I'll never forget struggling down the finishing stretch and the support the crowd gave me that got me through it.

The highlight of my race was seeing my friends Lori and Anthony at the 18.5 mile mark (when I was still feeling pretty good), with their 'Run Arun' sign. Lori was training for the marathon when she injured her knee. In her place I would have been pouting and eating bon-bons on the couch, but she and Anthony came out and cheered us all on -- making the killer sign, giving me pretzels, gatorade and hugs (which must have been pretty gross since I was drenched in sweat). Thanks guys, seeing you both after the long grind uphill was exactly what I needed!

For a first marathon, the Seattle Rock n Roll was perfect. The weather was great, the bands were great (especially for a guy that doesn't train with an iPod), the water stations were perfectly placed, and the course was challenging, with the biggest hill coming on at mile 15-18, running up the false flat of highway 99. For me, the toughest part was running past the turnoff towards the finish on mile 23, and knowing I still had 3 long, cramp filled miles to go. That was mental.

My cramping was probably due to the lack of recent training. I had trained for a specific pace, and kept that pace for the first 21 miles. But the time off caught up with me in the end, and my target pace was probably too fast for my current level of fitness. I guess I'll have to claim a 'moral' victory. And there will always be next year!


Tuesday, June 23, 2009

Streaming Data with a Worker/Agent based approach

Where I was going....
In my last post I described how at work, we were investigating using Hadoop in a non batch setting. I mentioned that despite not using Hadoop's ability to collate keyed data from large data sets, we were still investigating Hadoop because of the built in robustness of the system:
  • Nodes are checked via 'heartbeat'
  • task status is centrally tracked
  • failed tasks are retried.
  • Work is pulled from the central JobTracker by TaskTrackers.
The basic pain points of maintaining highly available and robust functionality across a cluster of machines is taken care of, and was the primary motivator for us to try and stream data across a batch driven system.

However as we moved into implementation it became fairly obvious that we were pounding a square peg into a round hole. A lot has been written about how Hadoop and HDFS doesn't work particularly well with small files -- the recommended solutions usually involve concatenating those files into something bigger to reduce the number of seeks per map job. While these problems were understandable in a system optimized to process huge amounts of data in batch, waiting to batch up large files wasn't an option given the low latency requirement of our end users.

Especially disconcerting was the amount of work (and code) spent bundling queued work items into small files, and submitting those files as individual jobs. The standard worker model --having multiple processes with multiple threads per process running on multiple machines access SQS and process the data -- seemed so much simpler than creating artificial batches.

A Swift Change of Direction
The rewrite took a matter of hours, dropped out a lot of code, and was a minor change to the overall architecture, which uses SQS to transition between workflow states, and S3 to persist the results of data transformations. The move away from Hadoop was limited to intermediate worker processes -- we still use Hadoop to get the data into the system, because we are collating data across a set of keys when importing data. The latency went from somewhat indeterminate across mini batches to being the average time to process per thread. And the workers were easily subclassed from the Callable class -- developers could implement new workers by overriding a single method that took a string as input. When latency of the system went up, simply adding more machines running more processes would take care of the problem.

Distributed Availability and Retry Logic
Of course, that simplicity came with a price tag -- we lost the distributed bookkeeping that Hadoop provided. Specifically, we would have to implement:
  1. thread and process failure detection
  2. machine failure detection
  3. retry logic
All of which is non trivial to implement. However, our need to stream instead of batch data meant that we would have ended up having to do the retry logic differently than Hadoop anyways. We need to catch and retry data failures at a work item level, not at an arbitrarily determined file split level.

Our retry logic is pretty simple, and uses S3 to persist workflow state per work item. We traverse a list of items in the queue, determine which ones have 'stalled out', and submit them to the appropriate queue as part of a retry. At the same time we clean up work items that have been fully processed, and get average processing time per workflow process. These three things are best done in an asynchronous manner, as -- you guessed it -- Hadoop jobs. They need to take advantage of Hadoop's collation functionality.

Our thread failure logic is also pretty simple. Because I'm starting up Callable tasks and making them run until I shut them down, I can check to see if any of them have finished prematurely by calling isDone() on the Futures returned when submitting them to the ExecutorService.

Process failure can be monitored (and logged) by a watchdog program. Repeated process failure in this case is symptomatic of an uncaught exception being thrown in one of the process threads.

Machine failure is also easily monitorable. I need to expose a simple service on each machine to detect process and thread failures, and if that process is not reachable, I can assume that the machine is offline.

These may be fairly limited and crude methods of getting a highly available system in place, but they feel like the right primitives to implement because while I don't know why the system is going to fail, each of these methods gives me a way to know how it is failing.

The Conclusion (so far)
The morals of the story at this point are:
  1. frameworks can be extremely powerful if used for their strengths, and extremely limiting if used for their secondary benefits. When it feels like I'm pounding a square peg into a round hole, I probably am. I think this is called 'design smell', and now that I know what it smells like, I'll start backing up a lot sooner in an effort to find the right tool for the job.
  2. It is always a good sign when a refactoring drops out lots of code.
  3. Having to implement the availability and robustness of the system we are writing has actually made it easier to understand. Even though we are implementing functionality that we once got for free, at least we understand the limitations of the availability and robustness solutions we put in place.

Monday, June 8, 2009

Streaming Data with Hadoop and SQS -- a work in progress

Hadoop is by design a batch oriented system. Take a bunch of data, run a series of maps and reduces with it across X machines, and come back when it's done. A Hadoop cluster has high throughput and high latency. In other words, it takes a while, but a lot of stuff gets done.

At work, I'm leading a team that is implementing a data processing pipeline. The primary use case that we need to address involves quickly handling changes in the entity data we care about -- people, places, and things that act or act upon other people, places and things. Specifically, this means:
  • detect that there has been a change
  • get the changed data into the system
  • apply a series of transforms to it for downline rendering and searching systems
This use case requires a streaming solution. Each piece of data needs to be transformed and pushed into production relatively quickly. Note that there is an acceptable latency. If, for instance, if Famous Actor X dies, our system needs to detect it and update the data within the hour. However, detecting/updating data within a day would be too slow to be useful to someone wanting to know what was up with Famous Actor X.

At first glance, Hadoop is not a good fit for this solution. It takes file based inputs and produces file based outputs, so any individual piece of data moving though the system is limited by the speed at which an entire input set can be run through a cluster.

However, Hadoop has the following features that make it ideal for distributing work.
  1. The MapReduce abstraction is a very powerful one that can be applied to many different kinds of data transformation problems.
  2. The primary Mapper And Reducer interfaces are simple enough to allow many different developers to ramp up on the system in minimal time.
  3. HDFS allows the developer to not worry about where they put job data.
  4. Cluster setup and maintenance, thanks to companies like Cloudera (who fixed the issues I was seeing with S3, thanks!), is taken care of.
  5. The logic around distributing the work is completely partitioned away from the business logic that actually comprises the work.
  6. Jobs that fail are re-attempted.
  7. I'm sure there's more..
As our system scales, we can only assume that the number of concurrent inputs and therefore the system load will grow. If we were to take the lowest initial effort route and write our own multithreaded scheduler, we would have a much more straightforward solution.
However, as the workload grows to swamp a single machine, we would eventually end up having to deal with the headaches of distributed computing -- protocol, redundancy, failover/retry, synchronization, etc.

In fact, I was fully intending to write a quick scheduler app to stream data and throw it away when we reached a scale that required distribution, at which point I was going to use Hadoop. However I soon realized that I would be solving problems -- scheduling, data access, retry logic -- and those were just the initial non distributed issues -- that were already addressed by Hadoop.

Still, there is the high latency/inherent batchiness of Hadoop. In order to work around the high latency problem, we're trying to enable streaming between MapReduces via SQS. The input of the entity data into the system and the various transforms of that data can be treated as a series of MapReduce clusters, where the transformation is done during the map, and any necessary collation is done during the reduce. The Reduce phase of each MapReduce can send a notification for the piece of data it has to the next MapReduce cluster.

Of course, it is really inefficient to run MapReduce over a single piece of data at a time. So the listener on each SQS queue buckets incoming messages using a max messages/max time threshold to aggregate data. When a threshold is reached, the system then writes the collected messages to a file that it then starts a MapReduce job on. This is mini batching, and as long as it delivers the data within the specified system latency, it's all good.

What we are doing is definitely a work in progress. The reason why is that there are several 'dials' in this process that need to be tuned and tweaked based on the size of the input data.
  1. number of messages/amount of time to wait. Of course, the larger the input files, the more 'efficient' the system. On the other hand, making those files larger may imply an increase in latency.
  2. number of concurrent jobs -- there is a sweet spot here -- it (again) doesn't make sense to launch 100 concurrent jobs. We will need to pay attention to how long a job takes before deciding to adjust the number of concurrent jobs up or down.
  3. number of transformations -- the bucketing required implies that every transform has a built in latency, that factors into the overall latency.
  4. cluster size -- it makes no sense to run 20 nodes where 2 would suffice, but there will be times when 20 is necessary
Some of the dials may be adjustable on the fly -- sending messages to a cluster via an 'admin' queue allows us to change message size/max time/concurrent job numbers dyamically. Other dials may require a stop-reconfig-start of a cluster. One benefit of using SQS is that no messages are lost while we tweak these 'hard stop' dials.

We're not doing a classical MapReduce where we transform data and then collate it under a distinct key. We're doing simple transformations with the data. The reduce piece isn't really needed, because there is no collation to be done. Plenty of people use MapReduce this way, primarily because it allows them to easily decompose their logic into parallel processing units.

We are even going further by undermining one of the key strengths of MapReduce, the ability to operate on large chunks of data, and instead running lots of smaller jobs concurrently. The Hadoop framework makes this possible. I'm not sure how optimal this is, and expect that we will be tweaking the message size and concurrency dials for some time. One of the advantages that the underlying Hadoop framework offers us is flexibility, as evidenced by the kinds of dials we can tweak to get optimal system throughput and latency.

I'll keep updating as we learn more about how the system behaves, and what the 'right' settings of the dials are based on the amount of data being input. I don't know if this is necessarily the most correct or elegant way to build a pipeline, but I do know that it is a good sign the Hadoop framework is this flexible.

Monday, June 1, 2009

logrotate and scripts that just cant let go.

I just found out that a logrotate job I had configured X months ago wasn't working when the lead came up to me and said 'what the $#!k is a 5GB file doing in my /var/log?' He was pissed because this was the second time logrotate had not been configured correctly (both times, my fault). The first time, we discovered logrotate.conf cannot have comments in it. This time, it looked like logrotate had run, but the script had kept the filehandle to the old log file open and was continuing to log to the rotated file.


One way to do this is send a HUP to the process in the postrotate script of the logrotate. This would mean I had to modify the script to trap the HUP signal, release the filehandle, get a handle to the new (same name) log file, and keep rolling on. I decided not to do this because it involved modifying the original script and I didn't have that much time to relearn things.

The second way I ended up doing this was to kill and restart the process during postrotate. Here is the config file in /etc/logrotate.d

/var/log/response_process.log {
daily
missingok
rotate 52
compress
delaycompress
notifempty
create 640 root adm
sharedscripts
postrotate
kill $(ps ax | grep process.rb | grep -v 'grep' | awk '{print $1}')
ruby process.rb -logfile:/var/log/response_process.log > /dev/null 2>&1
endscript
}

kind of hacky, but I didn't have to expend any mental effort making sure that the filehandle was truly closed in the script.