Without this legacy requirement I would have put in some kind of GUID scheme and called it an (early) day. The existence of previous items with IDs that other services relied upon made a GUID generation scheme not possible. However the requirement of distinct integers requires coordination between the agents, who would need to make sure they are not creating the same ID for different objects.
My initial thought was to implement a simple service that provided an integer that it would auto increment with every request. The big problem with this approach is that the service would be a massive, non redundant bottleneck unless it too was replicated, and then it would be faced with the same problem that the original workers faced wrt keeping integers synchronized across different processes.
So I had a bad feeling about starting down that path, and was putting it off, when a colleague at work suggested that I check out Zookeeper. Zookeeper, was created by Yahoo research specifically to solve the kind of synchronization problems that I was having, in a highly performant, fault tolerant way. In other words, this was the service that I was trying not to write :)
Zookeeper is in the words of its creators, "needed to be general enough to address our coordination needs and simple enough to implement a correct high performance service. We found that we were able to achieve {this} by trading strong synchronization for strong ordering guarantees and a wait-free interface."
What that means is that node access is 'no wait', meaning when you ask for data you get it, but you do not have an exclusive lock on the data. This is quite different than the mutex based locking model that I'm used to, and at first I didn't see how I could use this to guarantee unique IDs to multiple agents creating multiple items without getting an exclusive lock on the data and making a modification.
What I didn't get (until another colleague walked me through some of his code) is that any and all changes to the data are taken, and versioned. When I request data, I get back an object corresponding to the version of that data. When I submit data, I can specify that the submit will only succeed if the data (and therefore it's version) hasn't been updated from the version that I have. So if I get node data that is an integer ID, increment it, and try to update the data it back, two things can happen (excluding connection loss, which must also be dealt with):
- I can return successfully, meaning that my change was accepted because no one else had made changes since I had retrieved the data.
- I can get a Bad Version exception, which means I need to get the data again, and try to re-increment the new value.
The code below shows the method that requests, recieves, and attempts to increment the data:
public String getNextId() throws Exception {
ZooKeeper zk = null;
String hexId = null;
boolean keepGoing = true;
while(keepGoing == true) {
try {
Stat nodeStat = new Stat();
Stat setDataStat = null;
zk = getZooKeeper();
byte data[] = getDataWithRetries(zk,
sequenceName,
nodeStat);
ByteBuffer buf = ByteBuffer.wrap(data);
long value = buf.getLong();
value++;
buf.rewind();
buf.putLong(value);
try {
setDataStat = setDataWithRetries(
zk,sequenceName,
buf.array(),nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.BADVERSION)) {
nodeStat = setDataStat;
}
}
} finally {
// always need to close out the session!
zk.close();
}
}
return hexId;
}
I've wrapped calls to zookeeper with a getZookeeper() method, and pass the retrieved Zookeeper instance into two methods: getDataWithRetries(), and setDataWithRetries(). Both methods try to recover from connection losses as best they can.
The getDataWithRetries method takes a Zookeeper instance, the path to the node being accessed, and a Stat structure that will contain retrieved data version information. It returns the retrieved data in a byte array. Note how in this method I'm only going to recover from connection losses, because this is a read operation.
protected byte[] getDataWithRetries(
ZooKeeper zooKeeper,
String path,
Stat nodeStat) throws Exception{
byte data[] = null;
int i = 0;
while(i < RETRY_COUNT) {
try {
i++;
data = zooKeeper.getData(path,
false,
nodeStat);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.NODEEXISTS))
{
break;
}
else {
throw e;
}
}
}
if(i >= RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}
return data;
}
Once I have the data, I increment it. Note that Zookeeper data is always kept as a byte array, so I convert it in order to increment it:
ByteBuffer buf = ByteBuffer.wrap(data);
long value = buf.getLong();
value++;
buf.rewind();
buf.putLong(value);
and then try to resubmit it back to Zookeeper. This is where things get interesting. If someone has modified the data before I could get back to it, I need to get the new value of the data and try again. In the setDataWithRetries() method below, I only handle connection exceptions, and blow out if there is a BADVERSION exception:
protected Stat setDataWithRetries(
ZooKeeper zooKeeper,
String path,
byte data[],
Stat stat) throws Exception{
int i = 0;
Stat statFromSet = null;
while(i < RETRY_COUNT)
try {
i++;
statFromSet = zooKeeper.setData(path,
data,
stat.getVersion());
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.BADVERSION))
{
// differentiating for debug purposes
throw e;
}
else {
throw e;
}
}
}
if(i > RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}
return statFromSet;
}
The calling code of setDataWithRetries() handles the BADVERSION exception by getting the data again, and retrying the submit:
try {
setDataStat = setDataWithRetries(zk,
sequenceName,
buf.array(),
nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(!e.code().equals(Code.BADVERSION))
{
throw e;
}
}
So each agent tries to get an ID until they succeed, at which point they know they've got a unique one.
The thing I really like about the strong versioning and ordering approach, now that I understand it, is that it acknowledges concurrency and makes it easy to deal with. Locking, on the other hand, seems like an attempt to abstract away the concurrency by enforcing serialization, which works OK when you are managing machine or process local resources, but can have huge performance impacts when you are trying to synchronize access across multiple machines.
The next thing I'm considering using Zookeeper for is configuration changes. Right now I push configuration changes out to my worker nodes by hand and force a restart via their web service interface. I would like them to be able to reload themselves automatically when state changes. This is a step up from the simple code detailed in this post, it means I need to use Zookeepers notification capabilities to alert listening processes when the configuration changes.