Thursday, August 27, 2009

Zookeeper and Concurrency

Recently I ran into a problem that seemed to require more of a solution than I was willing to implement. We are currently migrating an application from using a single worker, single database to having multiple workers running in parallel using s3 as the primary means of storage. (Side note: this migration is only possible because the application doesn't actually require any of the unplanned, interactive queries that only a database is good at. The choice of a database as the persistence mechanism for this application was not a popular one, and only grew less popular as more time was spent putting out database related fires than implementing new features). One of the legacy requirements of the system, which supports an in production website, was that the IDs for all new items had to be distinct integers.

Without this legacy requirement I would have put in some kind of GUID scheme and called it an (early) day. The existence of previous items with IDs that other services relied upon made a GUID generation scheme not possible. However the requirement of distinct integers requires coordination between the agents, who would need to make sure they are not creating the same ID for different objects.

My initial thought was to implement a simple service that provided an integer that it would auto increment with every request. The big problem with this approach is that the service would be a massive, non redundant bottleneck unless it too was replicated, and then it would be faced with the same problem that the original workers faced wrt keeping integers synchronized across different processes.

So I had a bad feeling about starting down that path, and was putting it off, when a colleague at work suggested that I check out Zookeeper. Zookeeper, was created by Yahoo research specifically to solve the kind of synchronization problems that I was having, in a highly performant, fault tolerant way. In other words, this was the service that I was trying not to write :)

Zookeeper at 10000 feet consists of multiple services that maintain a hierarchical namespace consisting of nodes that can have child nodes. Each node can have associated data, limited to under 1MB, meant to be used for coordination/synchronization.

Zookeeper is in the words of its creators, "needed to be general enough to address our coordination needs and simple enough to implement a correct high performance service. We found that we were able to achieve {this} by trading strong synchronization for strong ordering guarantees and a wait-free interface."

What that means is that node access is 'no wait', meaning when you ask for data you get it, but you do not have an exclusive lock on the data. This is quite different than the mutex based locking model that I'm used to, and at first I didn't see how I could use this to guarantee unique IDs to multiple agents creating multiple items without getting an exclusive lock on the data and making a modification.

What I didn't get (until another colleague walked me through some of his code) is that any and all changes to the data are taken, and versioned. When I request data, I get back an object corresponding to the version of that data. When I submit data, I can specify that the submit will only succeed if the data (and therefore it's version) hasn't been updated from the version that I have. So if I get node data that is an integer ID, increment it, and try to update the data it back, two things can happen (excluding connection loss, which must also be dealt with):
  1. I can return successfully, meaning that my change was accepted because no one else had made changes since I had retrieved the data.
  2. I can get a Bad Version exception, which means I need to get the data again, and try to re-increment the new value.
The code below shows the method that requests, recieves, and attempts to increment the data:


public String getNextId() throws Exception {
ZooKeeper zk = null;
String hexId = null;

boolean keepGoing = true;

while(keepGoing == true) {
try {
Stat nodeStat = new Stat();
Stat setDataStat = null;
zk = getZooKeeper();
byte data[] = getDataWithRetries(zk,
sequenceName,
nodeStat);
ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);


try {
setDataStat = setDataWithRetries(
zk,sequenceName,
buf.array(),nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.BADVERSION)) {
nodeStat = setDataStat;
}

}
} finally {

// always need to close out the session!
zk.close();
}

}


return hexId;
}

I've wrapped calls to zookeeper with a getZookeeper() method, and pass the retrieved Zookeeper instance into two methods: getDataWithRetries(), and setDataWithRetries(). Both methods try to recover from connection losses as best they can.

The getDataWithRetries method takes a Zookeeper instance, the path to the node being accessed, and a Stat structure that will contain retrieved data version information. It returns the retrieved data in a byte array. Note how in this method I'm only going to recover from connection losses, because this is a read operation.


protected byte[] getDataWithRetries(
ZooKeeper zooKeeper,
String path,
Stat nodeStat) throws Exception{


byte data[] = null;

int i = 0;

while(i < RETRY_COUNT) {
try {
i++;
data = zooKeeper.getData(path,
false,
nodeStat);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.NODEEXISTS))
{
break;
}
else {
throw e;
}
}
}
if(i >= RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return data;

}


Once I have the data, I increment it. Note that Zookeeper data is always kept as a byte array, so I convert it in order to increment it:

ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);

and then try to resubmit it back to Zookeeper. This is where things get interesting. If someone has modified the data before I could get back to it, I need to get the new value of the data and try again. In the setDataWithRetries() method below, I only handle connection exceptions, and blow out if there is a BADVERSION exception:



protected Stat setDataWithRetries(
ZooKeeper zooKeeper,
String path,
byte data[],
Stat stat) throws Exception{

int i = 0;
Stat statFromSet = null;

while(i < RETRY_COUNT)
try {
i++;
statFromSet = zooKeeper.setData(path,
data,
stat.getVersion());
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.BADVERSION))
{
// differentiating for debug purposes
throw e;
}
else {
throw e;
}
}
}

if(i > RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return statFromSet;

}

The calling code of setDataWithRetries() handles the BADVERSION exception by getting the data again, and retrying the submit:

try {
setDataStat = setDataWithRetries(zk,
sequenceName,
buf.array(),
nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(!e.code().equals(Code.BADVERSION))
{
throw e;
}
}


So each agent tries to get an ID until they succeed, at which point they know they've got a unique one.

The thing I really like about the strong versioning and ordering approach, now that I understand it, is that it acknowledges concurrency and makes it easy to deal with. Locking, on the other hand, seems like an attempt to abstract away the concurrency by enforcing serialization, which works OK when you are managing machine or process local resources, but can have huge performance impacts when you are trying to synchronize access across multiple machines.

The next thing I'm considering using Zookeeper for is configuration changes. Right now I push configuration changes out to my worker nodes by hand and force a restart via their web service interface. I would like them to be able to reload themselves automatically when state changes. This is a step up from the simple code detailed in this post, it means I need to use Zookeepers notification capabilities to alert listening processes when the configuration changes.

8 comments:

  1. I've been working on this for a few days now, and I've found several solutions but none of them incredibly simple or lightweight. The problem is basically this: We have a cluster of 10 machines, each of which is running the same software on a multithreaded ESB platform.

    ReplyDelete
  2. Seems like you should really grab 50 ids each grab so there is less contention. Worst thing in a solution of one is if it is heavily slammed you get back LOTS of BadVersionExceptions. Instead grab 50 and use up those 50 before grabbing the next 50 ;). Also, I believe hbase and most noSQL environments without zookeeper have a checkAndPut method like you describe on zookeeper.

    ReplyDelete
  3. This post has been extremely useful for me, because I have to develop a very similar functionality, and it is very hard to find documentation and examples of ZooKeeper in Internet.
    I have tested your code, and I found an error, at least it is an error using the latest current version of ZooKeeper (3.3.3).

    catch(KeeperException e) {
    if(e.code().equals(Code.BADVERSION)) {
    nodeStat = setDataStat;
    }
    }

    If a KeeperException occurs while trying to set the new data, then the stat returned is null, and like you assign the returned stat in the catch block, in the next call to getData() an exception will be thrown because the stat is null. What I did is simply catch the exception and continue (checking the code)

    ReplyDelete
  4. Great post. I found it very useful because I had to develop a similar functionality for a distributed system, and there is very few information about ZooKeeper in the web.
    I want to correct just one line of your sample code, in order to work properly with current latest ZooKeeper version (3.3.3): in setData(), if KeeperException with BADVERSION code is thrown, then the returned stat is null, so the code:

    catch(KeeperException e) {
    if(e.code().equals(Code.BADVERSION)) {
    nodeStat = setDataStat
    }
    }

    is not correct, because nodeStat is null and then, in the following call to getData() using nodeStat as a parameter, a NullPointerException will be thrown.

    Thanks a lot for your contribution!!!
    Regards.

    ReplyDelete
  5. Great post. I found it very useful because I had to develop a similar functionality for a distributed system, and there is very few information about ZooKeeper in the web.
    I want to correct just one line of your sample code, in order to work properly with current latest ZooKeeper version (3.3.3): in setData(), if KeeperException with BADVERSION code is thrown, then the returned stat is null, so the code:

    catch(KeeperException e) {
    if(e.code().equals(Code.BADVERSION)) {
    nodeStat = setDataStat
    }
    }

    is not correct, because nodeStat is null and then, in the following call to getData() using nodeStat as a parameter, a NullPointerException will be thrown.

    Thanks a lot for your contribution!!!
    Regards.

    ReplyDelete
    Replies
    1. Disagree with the NullPointerException part. The nodeStat is re-initialized every time inside the while loop, so it won't be null.

      But you can definitely say the section of code inside catch block has no effect (by the re-initialzation of nodeStat) and thus should be removed.

      Delete
  6. I needed this. Dude, you are the man... the documentation @ Zookeeper is pretty weird and generic.

    ReplyDelete
  7. This comment has been removed by a blog administrator.

    ReplyDelete