Thursday, August 27, 2009

Zookeeper and Concurrency

Recently I ran into a problem that seemed to require more of a solution than I was willing to implement. We are currently migrating an application from using a single worker, single database to having multiple workers running in parallel using s3 as the primary means of storage. (Side note: this migration is only possible because the application doesn't actually require any of the unplanned, interactive queries that only a database is good at. The choice of a database as the persistence mechanism for this application was not a popular one, and only grew less popular as more time was spent putting out database related fires than implementing new features). One of the legacy requirements of the system, which supports an in production website, was that the IDs for all new items had to be distinct integers.

Without this legacy requirement I would have put in some kind of GUID scheme and called it an (early) day. The existence of previous items with IDs that other services relied upon made a GUID generation scheme not possible. However the requirement of distinct integers requires coordination between the agents, who would need to make sure they are not creating the same ID for different objects.

My initial thought was to implement a simple service that provided an integer that it would auto increment with every request. The big problem with this approach is that the service would be a massive, non redundant bottleneck unless it too was replicated, and then it would be faced with the same problem that the original workers faced wrt keeping integers synchronized across different processes.

So I had a bad feeling about starting down that path, and was putting it off, when a colleague at work suggested that I check out Zookeeper. Zookeeper, was created by Yahoo research specifically to solve the kind of synchronization problems that I was having, in a highly performant, fault tolerant way. In other words, this was the service that I was trying not to write :)

Zookeeper at 10000 feet consists of multiple services that maintain a hierarchical namespace consisting of nodes that can have child nodes. Each node can have associated data, limited to under 1MB, meant to be used for coordination/synchronization.

Zookeeper is in the words of its creators, "needed to be general enough to address our coordination needs and simple enough to implement a correct high performance service. We found that we were able to achieve {this} by trading strong synchronization for strong ordering guarantees and a wait-free interface."

What that means is that node access is 'no wait', meaning when you ask for data you get it, but you do not have an exclusive lock on the data. This is quite different than the mutex based locking model that I'm used to, and at first I didn't see how I could use this to guarantee unique IDs to multiple agents creating multiple items without getting an exclusive lock on the data and making a modification.

What I didn't get (until another colleague walked me through some of his code) is that any and all changes to the data are taken, and versioned. When I request data, I get back an object corresponding to the version of that data. When I submit data, I can specify that the submit will only succeed if the data (and therefore it's version) hasn't been updated from the version that I have. So if I get node data that is an integer ID, increment it, and try to update the data it back, two things can happen (excluding connection loss, which must also be dealt with):
  1. I can return successfully, meaning that my change was accepted because no one else had made changes since I had retrieved the data.
  2. I can get a Bad Version exception, which means I need to get the data again, and try to re-increment the new value.
The code below shows the method that requests, recieves, and attempts to increment the data:


public String getNextId() throws Exception {
ZooKeeper zk = null;
String hexId = null;

boolean keepGoing = true;

while(keepGoing == true) {
try {
Stat nodeStat = new Stat();
Stat setDataStat = null;
zk = getZooKeeper();
byte data[] = getDataWithRetries(zk,
sequenceName,
nodeStat);
ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);


try {
setDataStat = setDataWithRetries(
zk,sequenceName,
buf.array(),nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.BADVERSION)) {
nodeStat = setDataStat;
}

}
} finally {

// always need to close out the session!
zk.close();
}

}


return hexId;
}

I've wrapped calls to zookeeper with a getZookeeper() method, and pass the retrieved Zookeeper instance into two methods: getDataWithRetries(), and setDataWithRetries(). Both methods try to recover from connection losses as best they can.

The getDataWithRetries method takes a Zookeeper instance, the path to the node being accessed, and a Stat structure that will contain retrieved data version information. It returns the retrieved data in a byte array. Note how in this method I'm only going to recover from connection losses, because this is a read operation.


protected byte[] getDataWithRetries(
ZooKeeper zooKeeper,
String path,
Stat nodeStat) throws Exception{


byte data[] = null;

int i = 0;

while(i < RETRY_COUNT) {
try {
i++;
data = zooKeeper.getData(path,
false,
nodeStat);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.NODEEXISTS))
{
break;
}
else {
throw e;
}
}
}
if(i >= RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return data;

}


Once I have the data, I increment it. Note that Zookeeper data is always kept as a byte array, so I convert it in order to increment it:

ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);

and then try to resubmit it back to Zookeeper. This is where things get interesting. If someone has modified the data before I could get back to it, I need to get the new value of the data and try again. In the setDataWithRetries() method below, I only handle connection exceptions, and blow out if there is a BADVERSION exception:



protected Stat setDataWithRetries(
ZooKeeper zooKeeper,
String path,
byte data[],
Stat stat) throws Exception{

int i = 0;
Stat statFromSet = null;

while(i < RETRY_COUNT)
try {
i++;
statFromSet = zooKeeper.setData(path,
data,
stat.getVersion());
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.BADVERSION))
{
// differentiating for debug purposes
throw e;
}
else {
throw e;
}
}
}

if(i > RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return statFromSet;

}

The calling code of setDataWithRetries() handles the BADVERSION exception by getting the data again, and retrying the submit:

try {
setDataStat = setDataWithRetries(zk,
sequenceName,
buf.array(),
nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(!e.code().equals(Code.BADVERSION))
{
throw e;
}
}


So each agent tries to get an ID until they succeed, at which point they know they've got a unique one.

The thing I really like about the strong versioning and ordering approach, now that I understand it, is that it acknowledges concurrency and makes it easy to deal with. Locking, on the other hand, seems like an attempt to abstract away the concurrency by enforcing serialization, which works OK when you are managing machine or process local resources, but can have huge performance impacts when you are trying to synchronize access across multiple machines.

The next thing I'm considering using Zookeeper for is configuration changes. Right now I push configuration changes out to my worker nodes by hand and force a restart via their web service interface. I would like them to be able to reload themselves automatically when state changes. This is a step up from the simple code detailed in this post, it means I need to use Zookeepers notification capabilities to alert listening processes when the configuration changes.

Tuesday, August 11, 2009

Using java.util.concurrent.CountDownLatch to synchronize startup/shutdown

I've been a big fan of the Java concurrency library since I stumbled upon it a while back. Before it came along, I was relegated to writing my own thread pools, schedulers, etc. Which meant, of course, that I was relegated to introducing lots of subtle and deviant bugs into code that had nothing to do with the actual product I was delivering.

The java.util.concurrent library freed me up to go ahead and focus on what I was really trying to deliver instead of re-inventing a hard to write wheel. Plus they're way smarter than me about concurrency. I highly recommend reading Java Concurrency In Practice, even if you dont code in Java, because the concurrency issues they discuss are universal, even if the solutions are in Java.

In the latest installment of 'how java.util.concurrent made me a happier, more productive developer', I was implementing a web service layer to control the run state of a set of worker threads. These workers needed to be started/stopped/paused/resumed/{insert favorite action here}. I don't want to continue processing on the calling thread (the web service start/stop/etc methods) until I am sure that the action requested by the caller has completed across all worker threads, which are running asynchronously.

My first thought was to write a pair of interfaces that allowed me to synchronize when an action was requested and when it was completed. In other words:

public interface Worker {
public void start(Master master);
public void stop(Master master);
public void pause(Master master);
..
};

public interface Master {
public void started(Worker worker);
public void stopped(Worker worker);
public void paused(Worker worker);
}

The problem with these interfaces and this design is that every time I needed to add an action to Worker, I needed to add a corresponding 'completed' message to Master. Also, the implementation of Master would need to track each worker against a worker pool, and scan that pool to see if an action was completed. Clearly way too much work to write, let alone understand 3 months later. Also, I knew that this was a pretty common problem, probably solved by the concurrency lib. So I cracked open the book....

java.util.concurrent.CountDownLatch is, in the words of the guys who wrote the library, "a synchronizer that can delay the progress of threads until it reaches it's terminal state". Hmm. Using the synchronizer frees me up from having to track the specific kind of state of N specific workers:

public interface Worker
public enum RunState {
STOPPING,
STOPPED,
STARTING,
STARTED,
PAUSING,
PAUSED,
RESTARTING
};
public boolean start(TaskCompleted listener) throws Exception
public void stop(TaskCompleted listener) throws Exception;
public void pause(TaskCompleted listener) throws Exception;
public void restart(TaskCompleted listener) throws Exception;
public void reload(TaskCompleted listener) throws Exception;
public RunState getState() throws Exception;
}

public interface TaskCompleted {
public void completed();
}


In the code above,
(1) I no longer care about which action is completed, or which worker completed the action, which means
(2) I no longer am keeping state for X workers in order to return from the call.
(3) The TaskCompleted interface can be implemented as an anonymous class in the response.

The CountDownLatch is pretty simple: it blocks until it's internal count reaches zero:
private void pauseWorkers() throws InterruptedException {
final CountDownLatch waitForPause = new CountDownLatch(workers.size());
for(WorkerBase worker : workers) {
worker.pause(new TaskCompleted() {
public void completed() {
waitForPause.countDown();
}
});
}

waitForPause.await();

// and now we're paused.

}

In the code above, the CountDownLatch is initialized to the number of workers I have. I iterate through the list of workers and perform the 'pause' action on them. Then I wait for the latch to get counted down to 0 before proceeding. I am keeping no state on the workers, I only care when they've completed their requested action. I suppose that for I could replace the anonymous implementation with an actual (dirt simple) implememtation that takes the counter and decrements it.