Thursday, October 15, 2009

Using Zookeeper to Implement a Redundant Persistence Service

Preface

I've previously detailed how we use Zookeeper to generate unique sequenceIDs for items created by a pool of worker processes. Zookeeper is the ideal solution for this problem because of its strict order/version enforcing that allows completely non-blocking access to data, it's redundancy, and its easy to understand node and data paradigm.

Our current approach has multiple clients (one per worker process) requesting the latest sequence ID from a Zookeeper node. These sequence IDs are critical to the integrity of our data: they cannot be duplicated. Using Zookeeper version information, multiple clients request values and try to update those values with a specific version. This update will fail if someone else has updated the version in the meantime, so the client handles that failure, gets the latest value, increments it, and tries to update again. Again this is a non-blocking approach to updating a value and so there is no system level bottleneck.

Redundancy Required

In order to make our system truly redundant, we need to account for what happens if all zookeeper instances went offline by persisting the latest generated sequence IDs -- again we absolutely need to not duplicate IDs to maintain system integrity.  When we persist sequence IDs,  it is possible to restart all zookeeper instances and have them pick up where they left off. Note that we can reduce the amount of persisting needed by 'reserving' a future ID, persisting it, and only modifying it when the generated IDs actually get to that value. In other words, persist ID 100, and update that value to 200 when the system generates ID = 200. This maintains ID uniqueness across system restarts at the loss of 100 values, which is a decent tradeoff.

Persistence via Watches

The simplest implementation of a persistence service takes advantage of Zookeepers watch functionality, which lets a client register for notifications when a node goes away or its value changes. The client gets notified every time a watched value changes, and receives an Event object with the details of the change. In our case, the client is a Persistence Service, which retrieves the location of the updated data from the Event object, retrieves the data, and determines whether it needs to reserve a future block of numbers as described above. Note that Zookeeper Watches are defined to be one time triggers, so it is necessary to reset the watch if you want to keep receiving notifications about the data of a specific node, or the children of a specific node.

You watch data by registering a Watcher callback interface with Zookeeper. The Watcher interface implements the process() method, which handles the WatchedEvent parameter. The following process() method is determines when to persist the next reserved value.

public void process(WatchedEvent event) {
        Stat stat = new Stat();
        
        EventType type = event.getType();
        String node = event.getPath();
        if(type.equals(EventType.NodeDataChanged)) {
            
            try {
                
                byte inData[] = getData(sessionZooKeeper,event.getPath(),stat);
                currentSequenceID = SequenceHelper.longFromByteArray(inData);
                
                if(currentSequenceID % RESERVE_AMOUNT == 0) {
                    persistSequenceID(node,currentSequenceID+RESERVE_AMOUNT);
                }
                
            } catch (Exception e) {
                logger.error(e);
            }
            
        }
        else if(type.equals(EventType.NodeDeleted)) {
            logger.error("data node "+event.getPath()+" was deleted");
        }
        
        // every time you process a watch you need to re-register for the next one. 
        try {
            addWatch(sessionZooKeeper,this,node);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
    }

In this case, I point Zookeeper to the Watcher object when I retrieve a ZooKeeper instance:

public ZooKeeper getZooKeeper() throws Exception {
        
        return new ZooKeeper(hostString,DEFAULT_TIMEOUT,watcher);
    }

Watchers, as is implied above, are bound to the lifecycle of the Zookeeper object they are passed into. Watchers can also be established when checking to see if a node exists, or when retrieving children of a node, because the client may want to be notified if a node is created or if children of a node are created.

Redundancy and Leader Election/Succession

Of course, a persistence service is useless if it is not redundant, especially if the integrity of our data requires us to persist reserved sequence IDs.  We only want one process at a time persisting sequence IDs. If that process goes down, we want another process to step in immediately.  In other words, we want a single leader to be selected from a group of 2 or more independent processes, and we want immediate succession if that leader were to go away.

In order to handle Leader Election and Leader Succession, the multiple persistence processes create and watch  Sequential-Ephemeral nodes. Sequential-Ephemeral nodes have the following properties:
  1. They automatically increment if there is a previous numbered node.
  2. They go away if the ZooKeeper instance  that created them goes away. 
The persistence processes use these two properties when they start up:
  1. They create a Sequential-Ephemeral node (note, this requires that they have a ZooKeeper instance open for as long as they are alive, so that the node sticks around).
  2. They check to see if there is a lower numbered Sequential-Ephemeral node.
  3. If not, they are the leader, and they register to watch for changes on the nodes used to track sequence IDs.
  4. If there is a lower numbered sequential-ephemeral node, they register to watch that node. Specifically, they want to get notified if that node goes away.
  5. They only want to watch the nearest lower numbered node. This avoids a 'swarm' that would happen if all lower nodes watched the single leader node. So succession is guaranteed to be orderly and only done by one node at a time.
public synchronized void startListening() throws Exception {
       
       // create a persistent node to store Sequential-Ephemerals under.
       if(checkExists(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE) == false) {
           createNode(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE, null,
              CreateMode.PERSISTENT);
       }
       
       // sequential nodes end in /n_

       String path = SEQUENCE_MAINTAINER_LEADER_NODE+PREFIX;
       createdNode= createNode(sessionZooKeeper, path, null,
           CreateMode.EPHEMERAL_SEQUENTIAL);
       
       // this method transforms the sequential node string to a number
       // for easy comparision
       int sequence = sequenceNum(createdNode);
       
       // only watch the sequence if we are the lowest node.
       if(doesLowerNodeExist(sequence) == false) {
           logger.debug("this node is the primary node");
           isPrimary = true;
           loadSequenceMaintainers();
       }
       else 
       {
           // this node is a backup, watch the next node for failure.
           isPrimary = false;
           
           watchedNode = getNextLowestNode(sequence);
           if(watchedNode != null) {
               logger.info("this node is not primary, it is watching "+watchedNode);
               boolean added = super.addWatch(sessionZooKeeper,this,watchedNode);
               if(added == false) {
                   throw new SequenceIDDoesntExist(watchedNode);
               }
           }
           else {
               throw new SequenceIDDoesntExist(watchedNode);
           }
       }
       
       isListening = true;
    }

In the code above, I've abstracted a lot of the details of interacting with Zookeeper under methods. Hopefully the method names make it clear what I'm doing. There is enough specific documentation about the ZooKeeper API, I'm more interested in showing the logic behind electing a leader.

The code above shows that only one process will be the leader. The other processes will be watching the next lowest node, waiting for it to fail. This watching, of course, is done via the Watcher::process() method:

public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        
        if((path != null && path.equals(watchedNode)) && (type != null &&
           type.equals(EventType.NodeDeleted))) {
            
            try {
                int watchSequence = this.sequenceNum(watchedNode);
                if(this.doesLowerNodeExist(watchSequence) == false) {
                    logger.debug("watcher of "+watchedNode+" is now primary");
                    isPrimary = true;
                    watchedNode = null;
                    // now you are the leader! the previous leader has gone away.
                    // note you are no longer watching anyone, so no need to
                    // re-register the watch.
                    loadSequenceMaintainers();
                }
                else {
                    logger.debug("watcher of "+watchedNode+" is not yet primary");
                    // there is lower node that is not the leader. 
                    // so watch it instead. 
                    watchedNode = getNextLowestNode(watchSequence);
                    boolean success = addWatch(sessionZooKeeper, this, watchedNode);
                    if(success == false) {
                        throw new SequenceIDDoesntExist(watchedNode));
                    }
                }
            } catch (Exception e) {
                // fail fast for now
                logger.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
         ......
    }

Note that if an intermediate process fails, the 'watching' process then gets the next available lowest node to watch and watches it. If all intermediate processes fail, the watching process becomes the leader.

Dynamic Node Detection

Once these long lived services are up and running, I don't want to have to restart them if I am adding another sequence node to be tracked. We do this all of the time, because we are running multiple instances of the data pipeline and tracking sequences across all of them.
This again can be handled by setting a watch, this time on the children of a top level node, and restricting sequence node creation to directly underneath that node. In other words, have a zookeeper node called /all-sequences and stick sequence-1...sequence-N underneath it. We set the watch on the node when we check to see if it has children:
children = zooKeeper.getChildren(sequenceParentNode, true);
This registers the class that created the zooKeeper instance as the Watcher. In the process() handler, we detect whether children have been deleted or added. Unfortunately, we can only detect if children underneath a node have changed, so it is up to us to determine which ones have been deleted and which ones have been added:
public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        ....
        if(path != null && path.startsWith(sequenceParentNode) && (type != null)) {
            // getting this notification implies that you are a primary b/c that 
            // is the only way to register for it. 
            try {               
                if(type.equals(EventType.NodeChildrenChanged)) {
                    // figure out missing and new nodes (expensive, but this 
                    // only happens when a node is manually added)
                    List newSequences = new ArrayList();
                    List missing = new ArrayList();
                    List children = this.getChildren(sessionZooKeeper,
                        sequenceParentNode,false);
                    for(String child : children) {
                        if(this.sequenceMaintainers.containsKey(child) == false) {
                            newSequences.add(child);
                        }
                    }
                    
                    for(String currentSequence : sequenceMaintainers.keySet()) {
                        if(children.contains(currentSequence) == false) {
                            missing.add(currentSequence);
                        }
                    }
                    
                    // add new sequences to watch list
                    for(String child : newSequences) {
                        String sequencePath = sequenceParentNode+"/"+child;
                        sequenceMaintainers.put(child,
                           new SequenceMaintainer(s3Accessor,
                             hosts,sequencePath,true));
                    }
                    
                    for(String child : missing) {
                        sequenceMaintainers.remove(child);
                    }
                    
                }
                
                boolean success = addWatch(sessionZooKeeper, this, 
                   sequenceParentNode);
                if(success == false) {
                    throw new SequenceIDDoesntExist(sequenceParentNode));
                }
            } catch (Exception e) {
                // fail fast for now
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

Conclusion

I'm pretty amazed how much I was able to leverage two main points of functionality -- nodes and watches -- to build a persistent sequence watching service. Once again it seems that picking the right primitives is what makes ZooKeeper so practical for distributed synchronization.

2 comments:

  1. Thanks Arun for this post.
    I appreciate it.

    It helped me.

    ReplyDelete
  2. c4 orange dreamsicle review :This post makes a lot of sense indeed and I appreciate the work

    ReplyDelete