Thursday, October 15, 2009

Using Zookeeper to Implement a Redundant Persistence Service

Preface

I've previously detailed how we use Zookeeper to generate unique sequenceIDs for items created by a pool of worker processes. Zookeeper is the ideal solution for this problem because of its strict order/version enforcing that allows completely non-blocking access to data, it's redundancy, and its easy to understand node and data paradigm.

Our current approach has multiple clients (one per worker process) requesting the latest sequence ID from a Zookeeper node. These sequence IDs are critical to the integrity of our data: they cannot be duplicated. Using Zookeeper version information, multiple clients request values and try to update those values with a specific version. This update will fail if someone else has updated the version in the meantime, so the client handles that failure, gets the latest value, increments it, and tries to update again. Again this is a non-blocking approach to updating a value and so there is no system level bottleneck.

Redundancy Required

In order to make our system truly redundant, we need to account for what happens if all zookeeper instances went offline by persisting the latest generated sequence IDs -- again we absolutely need to not duplicate IDs to maintain system integrity.  When we persist sequence IDs,  it is possible to restart all zookeeper instances and have them pick up where they left off. Note that we can reduce the amount of persisting needed by 'reserving' a future ID, persisting it, and only modifying it when the generated IDs actually get to that value. In other words, persist ID 100, and update that value to 200 when the system generates ID = 200. This maintains ID uniqueness across system restarts at the loss of 100 values, which is a decent tradeoff.

Persistence via Watches

The simplest implementation of a persistence service takes advantage of Zookeepers watch functionality, which lets a client register for notifications when a node goes away or its value changes. The client gets notified every time a watched value changes, and receives an Event object with the details of the change. In our case, the client is a Persistence Service, which retrieves the location of the updated data from the Event object, retrieves the data, and determines whether it needs to reserve a future block of numbers as described above. Note that Zookeeper Watches are defined to be one time triggers, so it is necessary to reset the watch if you want to keep receiving notifications about the data of a specific node, or the children of a specific node.

You watch data by registering a Watcher callback interface with Zookeeper. The Watcher interface implements the process() method, which handles the WatchedEvent parameter. The following process() method is determines when to persist the next reserved value.

public void process(WatchedEvent event) {
        Stat stat = new Stat();
        
        EventType type = event.getType();
        String node = event.getPath();
        if(type.equals(EventType.NodeDataChanged)) {
            
            try {
                
                byte inData[] = getData(sessionZooKeeper,event.getPath(),stat);
                currentSequenceID = SequenceHelper.longFromByteArray(inData);
                
                if(currentSequenceID % RESERVE_AMOUNT == 0) {
                    persistSequenceID(node,currentSequenceID+RESERVE_AMOUNT);
                }
                
            } catch (Exception e) {
                logger.error(e);
            }
            
        }
        else if(type.equals(EventType.NodeDeleted)) {
            logger.error("data node "+event.getPath()+" was deleted");
        }
        
        // every time you process a watch you need to re-register for the next one. 
        try {
            addWatch(sessionZooKeeper,this,node);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
    }

In this case, I point Zookeeper to the Watcher object when I retrieve a ZooKeeper instance:

public ZooKeeper getZooKeeper() throws Exception {
        
        return new ZooKeeper(hostString,DEFAULT_TIMEOUT,watcher);
    }

Watchers, as is implied above, are bound to the lifecycle of the Zookeeper object they are passed into. Watchers can also be established when checking to see if a node exists, or when retrieving children of a node, because the client may want to be notified if a node is created or if children of a node are created.

Redundancy and Leader Election/Succession

Of course, a persistence service is useless if it is not redundant, especially if the integrity of our data requires us to persist reserved sequence IDs.  We only want one process at a time persisting sequence IDs. If that process goes down, we want another process to step in immediately.  In other words, we want a single leader to be selected from a group of 2 or more independent processes, and we want immediate succession if that leader were to go away.

In order to handle Leader Election and Leader Succession, the multiple persistence processes create and watch  Sequential-Ephemeral nodes. Sequential-Ephemeral nodes have the following properties:
  1. They automatically increment if there is a previous numbered node.
  2. They go away if the ZooKeeper instance  that created them goes away. 
The persistence processes use these two properties when they start up:
  1. They create a Sequential-Ephemeral node (note, this requires that they have a ZooKeeper instance open for as long as they are alive, so that the node sticks around).
  2. They check to see if there is a lower numbered Sequential-Ephemeral node.
  3. If not, they are the leader, and they register to watch for changes on the nodes used to track sequence IDs.
  4. If there is a lower numbered sequential-ephemeral node, they register to watch that node. Specifically, they want to get notified if that node goes away.
  5. They only want to watch the nearest lower numbered node. This avoids a 'swarm' that would happen if all lower nodes watched the single leader node. So succession is guaranteed to be orderly and only done by one node at a time.
public synchronized void startListening() throws Exception {
       
       // create a persistent node to store Sequential-Ephemerals under.
       if(checkExists(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE) == false) {
           createNode(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE, null,
              CreateMode.PERSISTENT);
       }
       
       // sequential nodes end in /n_

       String path = SEQUENCE_MAINTAINER_LEADER_NODE+PREFIX;
       createdNode= createNode(sessionZooKeeper, path, null,
           CreateMode.EPHEMERAL_SEQUENTIAL);
       
       // this method transforms the sequential node string to a number
       // for easy comparision
       int sequence = sequenceNum(createdNode);
       
       // only watch the sequence if we are the lowest node.
       if(doesLowerNodeExist(sequence) == false) {
           logger.debug("this node is the primary node");
           isPrimary = true;
           loadSequenceMaintainers();
       }
       else 
       {
           // this node is a backup, watch the next node for failure.
           isPrimary = false;
           
           watchedNode = getNextLowestNode(sequence);
           if(watchedNode != null) {
               logger.info("this node is not primary, it is watching "+watchedNode);
               boolean added = super.addWatch(sessionZooKeeper,this,watchedNode);
               if(added == false) {
                   throw new SequenceIDDoesntExist(watchedNode);
               }
           }
           else {
               throw new SequenceIDDoesntExist(watchedNode);
           }
       }
       
       isListening = true;
    }

In the code above, I've abstracted a lot of the details of interacting with Zookeeper under methods. Hopefully the method names make it clear what I'm doing. There is enough specific documentation about the ZooKeeper API, I'm more interested in showing the logic behind electing a leader.

The code above shows that only one process will be the leader. The other processes will be watching the next lowest node, waiting for it to fail. This watching, of course, is done via the Watcher::process() method:

public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        
        if((path != null && path.equals(watchedNode)) && (type != null &&
           type.equals(EventType.NodeDeleted))) {
            
            try {
                int watchSequence = this.sequenceNum(watchedNode);
                if(this.doesLowerNodeExist(watchSequence) == false) {
                    logger.debug("watcher of "+watchedNode+" is now primary");
                    isPrimary = true;
                    watchedNode = null;
                    // now you are the leader! the previous leader has gone away.
                    // note you are no longer watching anyone, so no need to
                    // re-register the watch.
                    loadSequenceMaintainers();
                }
                else {
                    logger.debug("watcher of "+watchedNode+" is not yet primary");
                    // there is lower node that is not the leader. 
                    // so watch it instead. 
                    watchedNode = getNextLowestNode(watchSequence);
                    boolean success = addWatch(sessionZooKeeper, this, watchedNode);
                    if(success == false) {
                        throw new SequenceIDDoesntExist(watchedNode));
                    }
                }
            } catch (Exception e) {
                // fail fast for now
                logger.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
         ......
    }

Note that if an intermediate process fails, the 'watching' process then gets the next available lowest node to watch and watches it. If all intermediate processes fail, the watching process becomes the leader.

Dynamic Node Detection

Once these long lived services are up and running, I don't want to have to restart them if I am adding another sequence node to be tracked. We do this all of the time, because we are running multiple instances of the data pipeline and tracking sequences across all of them.
This again can be handled by setting a watch, this time on the children of a top level node, and restricting sequence node creation to directly underneath that node. In other words, have a zookeeper node called /all-sequences and stick sequence-1...sequence-N underneath it. We set the watch on the node when we check to see if it has children:
children = zooKeeper.getChildren(sequenceParentNode, true);
This registers the class that created the zooKeeper instance as the Watcher. In the process() handler, we detect whether children have been deleted or added. Unfortunately, we can only detect if children underneath a node have changed, so it is up to us to determine which ones have been deleted and which ones have been added:
public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        ....
        if(path != null && path.startsWith(sequenceParentNode) && (type != null)) {
            // getting this notification implies that you are a primary b/c that 
            // is the only way to register for it. 
            try {               
                if(type.equals(EventType.NodeChildrenChanged)) {
                    // figure out missing and new nodes (expensive, but this 
                    // only happens when a node is manually added)
                    List newSequences = new ArrayList();
                    List missing = new ArrayList();
                    List children = this.getChildren(sessionZooKeeper,
                        sequenceParentNode,false);
                    for(String child : children) {
                        if(this.sequenceMaintainers.containsKey(child) == false) {
                            newSequences.add(child);
                        }
                    }
                    
                    for(String currentSequence : sequenceMaintainers.keySet()) {
                        if(children.contains(currentSequence) == false) {
                            missing.add(currentSequence);
                        }
                    }
                    
                    // add new sequences to watch list
                    for(String child : newSequences) {
                        String sequencePath = sequenceParentNode+"/"+child;
                        sequenceMaintainers.put(child,
                           new SequenceMaintainer(s3Accessor,
                             hosts,sequencePath,true));
                    }
                    
                    for(String child : missing) {
                        sequenceMaintainers.remove(child);
                    }
                    
                }
                
                boolean success = addWatch(sessionZooKeeper, this, 
                   sequenceParentNode);
                if(success == false) {
                    throw new SequenceIDDoesntExist(sequenceParentNode));
                }
            } catch (Exception e) {
                // fail fast for now
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

Conclusion

I'm pretty amazed how much I was able to leverage two main points of functionality -- nodes and watches -- to build a persistent sequence watching service. Once again it seems that picking the right primitives is what makes ZooKeeper so practical for distributed synchronization.

Thursday, October 1, 2009

Quick Webserver setup with Jersey and Jetty

No More Hand Rolling

In our data pipeline, we have different components that we communicate with via web services.  In the beginning, there were only three commands needed: pause, restart, and reload. So I wrote a quick Servlet, loaded up embedded Jetty, and called it good. The Servlet contained some method handling code to parse path strings:

String path = request.getPathInfo();
        
        if(path.equals(ROOT)) {
            response.setContentType("text/html");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println("name");
            response.getWriter().println("getInfo()");
        }
        else if(path.equals(STATS)) {
            response.setContentType("text/json");
            response.setStatus(HttpServletResponse.SC_OK);
            
            JSONObject responseObject = new JSONObject();
            responseObject.put("service", name);
            responseObject.put("status",status.toString().toLowerCase());
            responseObject.put("statistics", jobStatsAnalyzer.toJSON());
            response.getWriter().println(responseObject);
        }
        else if(path.startsWith(SERVICE_PREFIX)) {
            response.setContentType("text/html");
            
            responseCode =  
              processServiceRequest(path.substring(SERVICE_PREFIX.length(),
                path.length()),response);
            response.setStatus(responseCode);
            
        }
        else {
           ......
        }
And the processServiceRequest call is equally complex because it has to parse the next section of the path. Still, because there were only three methods and it took little time to code up, I felt fine about hand rolling the Servlet, even through there was a lot of boilerplate path parsing code.

One of our components now needs (much) more path handling. It is a validation component -- it collects transformation exceptions thrown by the view processing components. Those exceptions are dumped to an SQS queue, picked up by the validator, and dumped into a Lucene index that allows us to query for exceptions by various exception metadata. The validator needs to expose a more complex Rest Interface  that allows a data curator to find exception (resources) by that various metadata (sub resources), i.e. by exception type. They can then fix the the root cause of the exceptions, and validate that the exceptions go away via a set of scripts that query the validator web service.

One option to extend the current web service functionality would be to subclass the custom Servlet, but that's a lot more boilerplate code and I know that we are probably going to need to extend another component in another way, which would mean more code. More code to debug, more code to maintain, more code to understand.

Jersey aka JAX-RS aka JSR 311 allows you to compose restful services using annotations.  It is an alternative to hand rolling servlets that lets you declaratively bind REST methods (GET/PUT/POST/etc), paths, and handler functions. It handles serializing data from POJOs to XML/JSON and vice versa. I had been wanting to check it out for some time now, but simply didn't have a concrete need to do so.

Jersey And Jetty

I decided to stick with Jetty as my servlet container because launching an embedded instance was so brain dead. But I decided to use the Jersey servlet and see how hard it would be to re-implement my hand rolled servlet. The way to bind the Jersey Servlet to Jetty uses Jetty's ServletHolder class to instantiate the Jersey servlet and initialize it's annotation location as well as the location of the resources it is going to use to handle web requests. The code below shows how the Jetty ServletHolder is initalized with the Jersey ServletContainer (which actually implements the standard Servlet interface) and then bound to a context that allows the ServletContainer to handle all requests to the server.
sh = new ServletHolder(ServletContainer.class);
        
sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", RESOURCE_CONFIG);
sh.setInitParameter("com.sun.jersey.config.property.packages", handlerPackageLocation);
        
server = new Server(port);
        
Context context = new Context(server, "/", Context.SESSIONS);
context.addServlet(sh, "/*");
server.start();

The com.sun.jersey.config.property.packages parameter points to the location of the Jersey annotated resources that the Jersey ServletContainer uses when handling requests. Those resources are simply POJOs (Plain Old Java Objects) marked up with Jersey annotations.

Jersey Resources

In order to parse a specific path, you create and object and use the @Path annotation. A method in the POJO is bound to that path by default. You can also parse subpaths by binding them to other methods via the @Path annotation. Here is an example:
@Path("/")
public class DefaultMasterDaemonService {

        
    private ServiceHandler serviceHandler;


    // this one handles root requests
    @GET
    @Produces("text/plain")
    public String getInformation() {
        return serviceHandler.getInfo();
    }
    
    // this one handles /stats requests
    @GET
    @Path("/stats")
    @Produces("application/json")
    public DaemonStatus getStatus() {
        return serviceHandler.getStatus();
        
    }
    .....
}

Basic Annotations

There are a couple of annotations above worth discussing in addition to the @Path annotation.
The HTTP method that is bound to the POJO method is specified via the @GET, @POST, @PUT, @DELETE, and @HEAD annotations.
The returned content Mime type is specified with the @Produces annotation. In the example above, a request to the root path returns some informational text, and a request to /stats returns JSON.

Returning JSON and XML

In order to return JSON/XML, you need to leverage JAXB annotations to make your data objects serializable to JSON/XML. Note: remember to always include a default constructor on your data objects. Otherwise you get exceptions trying to serialize those objects.

I also found that unless I did _not_ declare getters and setters, I would also get serialization errors. I had not seen this before, and therefore assume that it is something specific to Jersey Serialization.

Here is an example of a JAXB annotated object that I use to return Status:
@XmlRootElement()
public class DaemonStatus {
    // apparently methods to access these are added at serialization time??
    @XmlElement
    public String serviceName;
    @XmlElement
    public String status;
    @XmlElement
    public JobStatsData jobStatsData;
    
    // need this one!
    public DaemonStatus() {
        
    }
    
    public DaemonStatus(String serviceName,String status,JobStatsData jobStatsData) {
        this.serviceName = serviceName;
        this.status = status;
        this.jobStatsData = jobStatsData;
        
    }
}
So all I needed to do to get JSON/XML in my return type was to create a JAXB annotated object, and specify what I wanted the method to produce via the Jersey @Produces annotation. Less code = more fun!

Parameterizing Path Components

Our components have Pause/Restart/Reload functionality accessible via the http://host/services/{pause|restart|reload} path, using POST. Jersey lets me parameterize the last part of the path, which makes the syntax of the command explicit while allowing me to only code string matching for the parameterized part:

@POST
@Path("/service/{action}")
public void doAction(@PathParam("action") String action) throws Exception {
        
  if(action.equals(MasterDaemon.PAUSE)) {
    serviceHandler.pause();
  }
  else if(action.equals(MasterDaemon.RELOAD)) {
    serviceHandler.reload();
  }
  else if(action.equals(MasterDaemon.RESUME)) {
    serviceHandler.resume();
  }
  else {
    throw new Exception("No such action supported: "+action);
  }
}
I've delegated the real meat of the action to a serviceHandler component, but this kind of path handling is about as easy as it gets. Note that the action parameter is specified via the @PathParam annotation directly in the method argument list.

Conclusion

I only really scratched the surface of what Jersey can do. In my case I don't have to parse query parameters, but that is easily done by specifying a @QueryParam argument to the handler method in the same way I specified the @PathParam. From what I've been able to understand, you can only access query params as strings (but that's pretty reasonable).

I really liked how quickly I was able to toss out my hand coded servlet and trade up to the Jersey one. Other people on the team were able to wire up rich REST interfaces on several components almost immediately, which let all of us go back to focusing on real requirements.

I usually 'cast a jaundiced eye' towards anything that even has a hint of framework in it, but Jersey was super fast to learn and using it instead of hand coded servlets has already saved us a lot of time and finger strain.