Monday, December 21, 2009

Impedance Mismatch

Post Mortem:

Long story short: every time I've had a project fail, one recurring theme has been bad technology choices. There have been others, like group dysfunctionality aka 'collective asshat fatigue', where the entire group stops functioning to avoid dealing with one or more aberrant personalities. Bad project scoping/definition also contribute to failure rates, but I don't think that there is much intersection between groundbreaking work and the prototypical well defined, well understood project.  So it would seem that issues of scoping as well as team dynamics are  (a) exacerbated by and therefore (b) secondary to the bad technology choices that put the project in jeopardy.

I thought I had gotten better at detecting when I was making a bad technology choice, but I recently made one. Fortunately we were able to turn things around, but it was hard.  In the interest of not making this class of bad decision again -- because the definition of insanity is to do the same thing and expect different results --  I want to dissect what went wrong, what went right, and what I learned.

The Choice:

I recently started a new job. My first task was to jump in and assist on a prototype project by writing feed parsers that would parse millions of rows of comma separated values from feeds into various records in an SQL database. I was initially constrained to using Java.  I took a standard object=row approach persisting single objects at a time.

The code was tdd, separation of concerns was good, all unit tests passed (coverage was good). Several hundred lines of code were required to parse, clean, validate, and insert data from various feeds into the database. I did not use an ORM, I used straight SQL via JDBC.

The Results:

The performance was horrendous. Inserting several million rows took hours, hours that we simply didn't have. The performance impacted the effectiveness of every down the line operation, and was jeopardizing the success of the overall project. Not a good way to start a new job :)

The Workaround:

We ended up ditching Java completely and rely on existing unix commandline tools to parse the files, insert into temp tables, and do bulk updates/inserts of rows from those temp tables into the main, 'canonical' tables.   In other words, the 100s of lines of java parsing and insertion code that I wrote in a week or so (counting unit testing) and frantically reworked several times to try and speed up was replaced by something like this:

cat rawfile.csv | cut -d, -f1 | tr ":upper:" ":lower:" | sed -e"s/^m//g" | sort | uniq > psql -c "copy tablename from stdin using delimiters ','" | actual_queries.pl


This took a multi hour query down to 5 minutes. There was a bunch of pre-formatting prior to inserting into the database, and a perl script that ran afterwards, using DBI to copy/update from the temp table.

In general, the one rule that emerged was 'do as much processing before going to the db'. For example, determining set exclusion/intersection, which is something I would have definitely gone to code or SQL for, could be done via commandline via the comm utility:

comm -12 <(sort file1) <(sort file2) gives the intersection of file1 and file2.
comm -13 <(sort file1) <(sort file2) gives unique lines from file2
etc.

Conclusions:

Conclusion 1: Tests Still Required.

The good thing about piping a bunch of common unix tools together is that they have been around for a long, long time. Meaning you don't have to worry about the integrity of the data as much as you have to worry about using the tool options correctly. The bad thing about this approach is that the only kind of testing is integration testing, and it is easy to blow off when the initial solution works (or seems to).

After getting bitten when the queries worked but the data had integrity issues that manifested in the logic,  we ended up writing a bunch of scripts that verified data integrity by making queries and inspecting result sets. We also leveraged the database, adding constraints that would allow the script to fail fast and alert us to schematic integrity issues, like duplicate rows.

Conclusion 2: It's Not the Databases Fault.

The database is a very convenient scapegoat, but the truth  is that I spoon fed data into the database, and I could only move as fast as I could move my spoon (in Java). The better approach is to bulk feed data into the database, via bulk copies and bulk inserts/updates. Again, verification/validation scripts and constraints are required.

Conclusion 3: SQL Good, ORM Bad.

The truth is that we could have done this in Java, had we just used the same SQL we ended up using in the Workaround. My mistake when using Java was to put on my ORM blinders, which are great for when I want to pretend that there is some arbitrary data store underneath my code. This works until it doesnt, usually at 12AM the day of a release.

Multiple FAILS mean I'm done pretending the database is some fuzzy abstract data 'store', because I will use one when I want to want to mine data along arbitrary axes -- in other words, I'll use a database precisely to use SQL and not some mapping to it.  SQL is a mature and extremely powerful way to ask open ended questions of a schema.  If I don't want to ask open ended questions of my data, I shouldn't use a database. Because that's what they're built for.  BTW I haven't used Hive or Pig yet, but these seem to be the QL solutions for much larger datasets than the one I was working with.

Conclusion 4: When in Doubt, Go Cheap, Go Fast.

However, just because we could have done it in Java doesn't mean we should have. Perl or Ruby or Python or Bash and the plethora of solid utilities available will now always be my first option when putting together a data input operation at this particular scale.

I think there will always be those opportunities that present themselves as vaguely defined chances to hit it big. Instead of taking lots of time up front to define the work involved at the expense of the actual opportunity, I'm going to move ahead with cheap and fast technologies that let me change path extremely quickly, because I'm sure I will need to at least once during the course of an ill defined project.

Conclusion 5: Keep the blinders off!

This entire experience was a huge reminder to me to be open minded about choosing the right tool for the job. This was an instance where I let the technology choice mandate my implementation decisions, instead of the other way around. Every time I do that, I get screwed. If, instead of putting my head down and running as fast as I could,  I had initially asked questions about the duration of the project, the intention of the code, the performance constraints on data input, etc, I could have easily justified the use of Perl/unix tools/raw SQL, and saved a lot of late nights/coding angst.

Conclusion 6: It's The People, Stupid

One thing that overwhelmingly shone through even in the grimmest of moments was the quality and class of the people I was working with. They all stayed completely focused on the solution, and did not point any fingers even when to do so would have been more than understandable. Furthermore, they were able to keep their sense of humor intact. While I didn't necessarily enjoy making this big of a mistake at a new job, the level of teamwork, professionalism, and respect from my new co-workers was complete confirmation of my reasons for jumping ship from my old company.

Thursday, October 15, 2009

Using Zookeeper to Implement a Redundant Persistence Service

Preface

I've previously detailed how we use Zookeeper to generate unique sequenceIDs for items created by a pool of worker processes. Zookeeper is the ideal solution for this problem because of its strict order/version enforcing that allows completely non-blocking access to data, it's redundancy, and its easy to understand node and data paradigm.

Our current approach has multiple clients (one per worker process) requesting the latest sequence ID from a Zookeeper node. These sequence IDs are critical to the integrity of our data: they cannot be duplicated. Using Zookeeper version information, multiple clients request values and try to update those values with a specific version. This update will fail if someone else has updated the version in the meantime, so the client handles that failure, gets the latest value, increments it, and tries to update again. Again this is a non-blocking approach to updating a value and so there is no system level bottleneck.

Redundancy Required

In order to make our system truly redundant, we need to account for what happens if all zookeeper instances went offline by persisting the latest generated sequence IDs -- again we absolutely need to not duplicate IDs to maintain system integrity.  When we persist sequence IDs,  it is possible to restart all zookeeper instances and have them pick up where they left off. Note that we can reduce the amount of persisting needed by 'reserving' a future ID, persisting it, and only modifying it when the generated IDs actually get to that value. In other words, persist ID 100, and update that value to 200 when the system generates ID = 200. This maintains ID uniqueness across system restarts at the loss of 100 values, which is a decent tradeoff.

Persistence via Watches

The simplest implementation of a persistence service takes advantage of Zookeepers watch functionality, which lets a client register for notifications when a node goes away or its value changes. The client gets notified every time a watched value changes, and receives an Event object with the details of the change. In our case, the client is a Persistence Service, which retrieves the location of the updated data from the Event object, retrieves the data, and determines whether it needs to reserve a future block of numbers as described above. Note that Zookeeper Watches are defined to be one time triggers, so it is necessary to reset the watch if you want to keep receiving notifications about the data of a specific node, or the children of a specific node.

You watch data by registering a Watcher callback interface with Zookeeper. The Watcher interface implements the process() method, which handles the WatchedEvent parameter. The following process() method is determines when to persist the next reserved value.

public void process(WatchedEvent event) {
        Stat stat = new Stat();
        
        EventType type = event.getType();
        String node = event.getPath();
        if(type.equals(EventType.NodeDataChanged)) {
            
            try {
                
                byte inData[] = getData(sessionZooKeeper,event.getPath(),stat);
                currentSequenceID = SequenceHelper.longFromByteArray(inData);
                
                if(currentSequenceID % RESERVE_AMOUNT == 0) {
                    persistSequenceID(node,currentSequenceID+RESERVE_AMOUNT);
                }
                
            } catch (Exception e) {
                logger.error(e);
            }
            
        }
        else if(type.equals(EventType.NodeDeleted)) {
            logger.error("data node "+event.getPath()+" was deleted");
        }
        
        // every time you process a watch you need to re-register for the next one. 
        try {
            addWatch(sessionZooKeeper,this,node);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
    }

In this case, I point Zookeeper to the Watcher object when I retrieve a ZooKeeper instance:

public ZooKeeper getZooKeeper() throws Exception {
        
        return new ZooKeeper(hostString,DEFAULT_TIMEOUT,watcher);
    }

Watchers, as is implied above, are bound to the lifecycle of the Zookeeper object they are passed into. Watchers can also be established when checking to see if a node exists, or when retrieving children of a node, because the client may want to be notified if a node is created or if children of a node are created.

Redundancy and Leader Election/Succession

Of course, a persistence service is useless if it is not redundant, especially if the integrity of our data requires us to persist reserved sequence IDs.  We only want one process at a time persisting sequence IDs. If that process goes down, we want another process to step in immediately.  In other words, we want a single leader to be selected from a group of 2 or more independent processes, and we want immediate succession if that leader were to go away.

In order to handle Leader Election and Leader Succession, the multiple persistence processes create and watch  Sequential-Ephemeral nodes. Sequential-Ephemeral nodes have the following properties:
  1. They automatically increment if there is a previous numbered node.
  2. They go away if the ZooKeeper instance  that created them goes away. 
The persistence processes use these two properties when they start up:
  1. They create a Sequential-Ephemeral node (note, this requires that they have a ZooKeeper instance open for as long as they are alive, so that the node sticks around).
  2. They check to see if there is a lower numbered Sequential-Ephemeral node.
  3. If not, they are the leader, and they register to watch for changes on the nodes used to track sequence IDs.
  4. If there is a lower numbered sequential-ephemeral node, they register to watch that node. Specifically, they want to get notified if that node goes away.
  5. They only want to watch the nearest lower numbered node. This avoids a 'swarm' that would happen if all lower nodes watched the single leader node. So succession is guaranteed to be orderly and only done by one node at a time.
public synchronized void startListening() throws Exception {
       
       // create a persistent node to store Sequential-Ephemerals under.
       if(checkExists(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE) == false) {
           createNode(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE, null,
              CreateMode.PERSISTENT);
       }
       
       // sequential nodes end in /n_

       String path = SEQUENCE_MAINTAINER_LEADER_NODE+PREFIX;
       createdNode= createNode(sessionZooKeeper, path, null,
           CreateMode.EPHEMERAL_SEQUENTIAL);
       
       // this method transforms the sequential node string to a number
       // for easy comparision
       int sequence = sequenceNum(createdNode);
       
       // only watch the sequence if we are the lowest node.
       if(doesLowerNodeExist(sequence) == false) {
           logger.debug("this node is the primary node");
           isPrimary = true;
           loadSequenceMaintainers();
       }
       else 
       {
           // this node is a backup, watch the next node for failure.
           isPrimary = false;
           
           watchedNode = getNextLowestNode(sequence);
           if(watchedNode != null) {
               logger.info("this node is not primary, it is watching "+watchedNode);
               boolean added = super.addWatch(sessionZooKeeper,this,watchedNode);
               if(added == false) {
                   throw new SequenceIDDoesntExist(watchedNode);
               }
           }
           else {
               throw new SequenceIDDoesntExist(watchedNode);
           }
       }
       
       isListening = true;
    }

In the code above, I've abstracted a lot of the details of interacting with Zookeeper under methods. Hopefully the method names make it clear what I'm doing. There is enough specific documentation about the ZooKeeper API, I'm more interested in showing the logic behind electing a leader.

The code above shows that only one process will be the leader. The other processes will be watching the next lowest node, waiting for it to fail. This watching, of course, is done via the Watcher::process() method:

public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        
        if((path != null && path.equals(watchedNode)) && (type != null &&
           type.equals(EventType.NodeDeleted))) {
            
            try {
                int watchSequence = this.sequenceNum(watchedNode);
                if(this.doesLowerNodeExist(watchSequence) == false) {
                    logger.debug("watcher of "+watchedNode+" is now primary");
                    isPrimary = true;
                    watchedNode = null;
                    // now you are the leader! the previous leader has gone away.
                    // note you are no longer watching anyone, so no need to
                    // re-register the watch.
                    loadSequenceMaintainers();
                }
                else {
                    logger.debug("watcher of "+watchedNode+" is not yet primary");
                    // there is lower node that is not the leader. 
                    // so watch it instead. 
                    watchedNode = getNextLowestNode(watchSequence);
                    boolean success = addWatch(sessionZooKeeper, this, watchedNode);
                    if(success == false) {
                        throw new SequenceIDDoesntExist(watchedNode));
                    }
                }
            } catch (Exception e) {
                // fail fast for now
                logger.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
         ......
    }

Note that if an intermediate process fails, the 'watching' process then gets the next available lowest node to watch and watches it. If all intermediate processes fail, the watching process becomes the leader.

Dynamic Node Detection

Once these long lived services are up and running, I don't want to have to restart them if I am adding another sequence node to be tracked. We do this all of the time, because we are running multiple instances of the data pipeline and tracking sequences across all of them.
This again can be handled by setting a watch, this time on the children of a top level node, and restricting sequence node creation to directly underneath that node. In other words, have a zookeeper node called /all-sequences and stick sequence-1...sequence-N underneath it. We set the watch on the node when we check to see if it has children:
children = zooKeeper.getChildren(sequenceParentNode, true);
This registers the class that created the zooKeeper instance as the Watcher. In the process() handler, we detect whether children have been deleted or added. Unfortunately, we can only detect if children underneath a node have changed, so it is up to us to determine which ones have been deleted and which ones have been added:
public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        ....
        if(path != null && path.startsWith(sequenceParentNode) && (type != null)) {
            // getting this notification implies that you are a primary b/c that 
            // is the only way to register for it. 
            try {               
                if(type.equals(EventType.NodeChildrenChanged)) {
                    // figure out missing and new nodes (expensive, but this 
                    // only happens when a node is manually added)
                    List newSequences = new ArrayList();
                    List missing = new ArrayList();
                    List children = this.getChildren(sessionZooKeeper,
                        sequenceParentNode,false);
                    for(String child : children) {
                        if(this.sequenceMaintainers.containsKey(child) == false) {
                            newSequences.add(child);
                        }
                    }
                    
                    for(String currentSequence : sequenceMaintainers.keySet()) {
                        if(children.contains(currentSequence) == false) {
                            missing.add(currentSequence);
                        }
                    }
                    
                    // add new sequences to watch list
                    for(String child : newSequences) {
                        String sequencePath = sequenceParentNode+"/"+child;
                        sequenceMaintainers.put(child,
                           new SequenceMaintainer(s3Accessor,
                             hosts,sequencePath,true));
                    }
                    
                    for(String child : missing) {
                        sequenceMaintainers.remove(child);
                    }
                    
                }
                
                boolean success = addWatch(sessionZooKeeper, this, 
                   sequenceParentNode);
                if(success == false) {
                    throw new SequenceIDDoesntExist(sequenceParentNode));
                }
            } catch (Exception e) {
                // fail fast for now
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

Conclusion

I'm pretty amazed how much I was able to leverage two main points of functionality -- nodes and watches -- to build a persistent sequence watching service. Once again it seems that picking the right primitives is what makes ZooKeeper so practical for distributed synchronization.

Thursday, October 1, 2009

Quick Webserver setup with Jersey and Jetty

No More Hand Rolling

In our data pipeline, we have different components that we communicate with via web services.  In the beginning, there were only three commands needed: pause, restart, and reload. So I wrote a quick Servlet, loaded up embedded Jetty, and called it good. The Servlet contained some method handling code to parse path strings:

String path = request.getPathInfo();
        
        if(path.equals(ROOT)) {
            response.setContentType("text/html");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println("name");
            response.getWriter().println("getInfo()");
        }
        else if(path.equals(STATS)) {
            response.setContentType("text/json");
            response.setStatus(HttpServletResponse.SC_OK);
            
            JSONObject responseObject = new JSONObject();
            responseObject.put("service", name);
            responseObject.put("status",status.toString().toLowerCase());
            responseObject.put("statistics", jobStatsAnalyzer.toJSON());
            response.getWriter().println(responseObject);
        }
        else if(path.startsWith(SERVICE_PREFIX)) {
            response.setContentType("text/html");
            
            responseCode =  
              processServiceRequest(path.substring(SERVICE_PREFIX.length(),
                path.length()),response);
            response.setStatus(responseCode);
            
        }
        else {
           ......
        }
And the processServiceRequest call is equally complex because it has to parse the next section of the path. Still, because there were only three methods and it took little time to code up, I felt fine about hand rolling the Servlet, even through there was a lot of boilerplate path parsing code.

One of our components now needs (much) more path handling. It is a validation component -- it collects transformation exceptions thrown by the view processing components. Those exceptions are dumped to an SQS queue, picked up by the validator, and dumped into a Lucene index that allows us to query for exceptions by various exception metadata. The validator needs to expose a more complex Rest Interface  that allows a data curator to find exception (resources) by that various metadata (sub resources), i.e. by exception type. They can then fix the the root cause of the exceptions, and validate that the exceptions go away via a set of scripts that query the validator web service.

One option to extend the current web service functionality would be to subclass the custom Servlet, but that's a lot more boilerplate code and I know that we are probably going to need to extend another component in another way, which would mean more code. More code to debug, more code to maintain, more code to understand.

Jersey aka JAX-RS aka JSR 311 allows you to compose restful services using annotations.  It is an alternative to hand rolling servlets that lets you declaratively bind REST methods (GET/PUT/POST/etc), paths, and handler functions. It handles serializing data from POJOs to XML/JSON and vice versa. I had been wanting to check it out for some time now, but simply didn't have a concrete need to do so.

Jersey And Jetty

I decided to stick with Jetty as my servlet container because launching an embedded instance was so brain dead. But I decided to use the Jersey servlet and see how hard it would be to re-implement my hand rolled servlet. The way to bind the Jersey Servlet to Jetty uses Jetty's ServletHolder class to instantiate the Jersey servlet and initialize it's annotation location as well as the location of the resources it is going to use to handle web requests. The code below shows how the Jetty ServletHolder is initalized with the Jersey ServletContainer (which actually implements the standard Servlet interface) and then bound to a context that allows the ServletContainer to handle all requests to the server.
sh = new ServletHolder(ServletContainer.class);
        
sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", RESOURCE_CONFIG);
sh.setInitParameter("com.sun.jersey.config.property.packages", handlerPackageLocation);
        
server = new Server(port);
        
Context context = new Context(server, "/", Context.SESSIONS);
context.addServlet(sh, "/*");
server.start();

The com.sun.jersey.config.property.packages parameter points to the location of the Jersey annotated resources that the Jersey ServletContainer uses when handling requests. Those resources are simply POJOs (Plain Old Java Objects) marked up with Jersey annotations.

Jersey Resources

In order to parse a specific path, you create and object and use the @Path annotation. A method in the POJO is bound to that path by default. You can also parse subpaths by binding them to other methods via the @Path annotation. Here is an example:
@Path("/")
public class DefaultMasterDaemonService {

        
    private ServiceHandler serviceHandler;


    // this one handles root requests
    @GET
    @Produces("text/plain")
    public String getInformation() {
        return serviceHandler.getInfo();
    }
    
    // this one handles /stats requests
    @GET
    @Path("/stats")
    @Produces("application/json")
    public DaemonStatus getStatus() {
        return serviceHandler.getStatus();
        
    }
    .....
}

Basic Annotations

There are a couple of annotations above worth discussing in addition to the @Path annotation.
The HTTP method that is bound to the POJO method is specified via the @GET, @POST, @PUT, @DELETE, and @HEAD annotations.
The returned content Mime type is specified with the @Produces annotation. In the example above, a request to the root path returns some informational text, and a request to /stats returns JSON.

Returning JSON and XML

In order to return JSON/XML, you need to leverage JAXB annotations to make your data objects serializable to JSON/XML. Note: remember to always include a default constructor on your data objects. Otherwise you get exceptions trying to serialize those objects.

I also found that unless I did _not_ declare getters and setters, I would also get serialization errors. I had not seen this before, and therefore assume that it is something specific to Jersey Serialization.

Here is an example of a JAXB annotated object that I use to return Status:
@XmlRootElement()
public class DaemonStatus {
    // apparently methods to access these are added at serialization time??
    @XmlElement
    public String serviceName;
    @XmlElement
    public String status;
    @XmlElement
    public JobStatsData jobStatsData;
    
    // need this one!
    public DaemonStatus() {
        
    }
    
    public DaemonStatus(String serviceName,String status,JobStatsData jobStatsData) {
        this.serviceName = serviceName;
        this.status = status;
        this.jobStatsData = jobStatsData;
        
    }
}
So all I needed to do to get JSON/XML in my return type was to create a JAXB annotated object, and specify what I wanted the method to produce via the Jersey @Produces annotation. Less code = more fun!

Parameterizing Path Components

Our components have Pause/Restart/Reload functionality accessible via the http://host/services/{pause|restart|reload} path, using POST. Jersey lets me parameterize the last part of the path, which makes the syntax of the command explicit while allowing me to only code string matching for the parameterized part:

@POST
@Path("/service/{action}")
public void doAction(@PathParam("action") String action) throws Exception {
        
  if(action.equals(MasterDaemon.PAUSE)) {
    serviceHandler.pause();
  }
  else if(action.equals(MasterDaemon.RELOAD)) {
    serviceHandler.reload();
  }
  else if(action.equals(MasterDaemon.RESUME)) {
    serviceHandler.resume();
  }
  else {
    throw new Exception("No such action supported: "+action);
  }
}
I've delegated the real meat of the action to a serviceHandler component, but this kind of path handling is about as easy as it gets. Note that the action parameter is specified via the @PathParam annotation directly in the method argument list.

Conclusion

I only really scratched the surface of what Jersey can do. In my case I don't have to parse query parameters, but that is easily done by specifying a @QueryParam argument to the handler method in the same way I specified the @PathParam. From what I've been able to understand, you can only access query params as strings (but that's pretty reasonable).

I really liked how quickly I was able to toss out my hand coded servlet and trade up to the Jersey one. Other people on the team were able to wire up rich REST interfaces on several components almost immediately, which let all of us go back to focusing on real requirements.

I usually 'cast a jaundiced eye' towards anything that even has a hint of framework in it, but Jersey was super fast to learn and using it instead of hand coded servlets has already saved us a lot of time and finger strain.

Thursday, September 10, 2009

Using ExecutorCompletionService to synchronize multithreaded workflow

Today I ran into a problem where I needed to make sure that one multithreaded phase of processing had completely ended before starting another. Specifically, I was retrieving documents from S3 to load into a lucene index, and wanted to retry all document requests that had failed due to S3 flakiness, connectivity issues, i.e. standard distributed computing error conditions.

In other situations requiring synchronization between threads, I've used a CountDownLatch. This works really well when you know the exact number of threads that you need to synchronize. You initialize the latch with the number of threads that you are synchronizing. When they finish work they decrement the latch, when the latch count goes to 0 you continue processing.

This time was different because instead of synchronizing threads,  I was trying to halt processing until all asynchronous submitted tasks had completed processing. I was queuing up several hundred thousand tasks into a thread pool, and did not know when that thread pool would be finished with the work, or how many threads would be running when the entire job completed, or even exactly how many tasks I had to run -- that number  depended on the number of documents being fetched, which is always growing.

Fortunately my situation was not a unique one.  I figured that the first place to look was the java concurrency library, and when I did some research, I found that ExecutorCompletionService was exactly what I needed.

ExecutorCompletionService works with a supplied Executor using Runnable/Callable tasks. I decided to use Callable tasks, as they allowed me to return and inspect a value, and throw exceptions.  As those tasks complete, they are placed onto a queue that can be accessed via the poll() or take() methods. This approach simplified my life:
  1. It allowed me to use tasks and task status to control logic flow instead of threads and thread status. 
  2. It freed me up from having to know how many threads or tasks I was working with.
  3. It provided me with an access point to each task that I could use to analyze the results.
  4. When the ExecutorCompletionService queue was empty, I knew it was time to then retry all failed results. 
Point (1) above is the foundation for the points that follow. When I used Future and Callable to implement the work I needed to do, I was able to return the results I cared about and process them. Specifically it allowed the code that ran the original key fetch loop to not worry about tracking and storing exceptions, which made for much simpler looping logic.

ExecutorCompletionService is a great example of how picking the right primitives makes it easy to compose highly functional helper objects. In this case the primitives involved were a (thread pool) executor and a (linked blocking) queue. (side note: I don't mean to sound like such a concurrent lib fanboy, but I'm really happy I didn't have to write this code myself, and really happy that the choice of primitives that the authors used enabled creation of classes like the ExecutorCompletionService. This stuff used to take significant effort on my part, and significant bugs were usually introduced :)


Here is an example of the ExecutorCompletionService in action, the relevant bits bolded and italicized.

public void buildEntireIndex() throws Exception {

        boolean moreKeys = true;
        int submittedJobCt = 0;
        // tracking all failed keys for later retry
        Map failedKeys = new HashMap();
        
        // ErrorCapturingExecutor is subclassed from ThreadPoolExecutor. 
        //I override ThreadPoolExecutor.afterExecution() to queue and later analyze exceptions.
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ErrorCapturingExecutor executor = new ErrorCapturingExecutor(threadCount, 
            threadCount, 0l, TimeUnit.SECONDS, linkedBlockingQueue);
        
        // CompletionService works with the supplied executor and queues up tasks as they finish.
        executorCompletionService = new ExecutorCompletionService(executor);
        
        
        String lastKey = null;
        
        while(moreKeys  == true) {    
            Set keys =  viewKeyRetriever.retrieveKeys(lastKey);
            
            if(keys.size() > 0) {
                String array[] = new String[keys.size()];
                keys.toArray(array); 
                lastKey = array[keys.size()-1];
                
                // I need to keep the number of waiting tasks bounded. 
                if(linkedBlockingQueue.size() > MAXQUEUESIZE) {
                    Thread.sleep(WAITTIME);
                }
                
                // this is where I actually submit tasks
                processKeys(keys);
                
                // I only know how many jobs I need to wait for when I've retrieved all keys.
                submittedJobCt++; 
                
            }
            else {
                moreKeys = false;
            }
            
        }

        
        // I use the ExecutorCompletionService queue to check on jobs as they complete. 
        for(int i = 0; i < submittedJobCt;i++) {
            
            Future finishedJob = executorCompletionService.take();
            
            // at this point, all I really care about is failures that I need to retry.
            BuildResults results = finishedJob.get();
            
            if(results.hasFailures()) {
                failedKeys.putAll(results.getFailedKeys());
            }
            
            indexBuilderMonitor.update(results);
        }
        
        // I can use failedKeys to retry processing on keys that have failed. 
        // Logic omitted for clarity.
        
        ...
        executor.shutdown();

    }


The ProcessKeys method is shown below: I broke it out because I needed to call it again when re-processing failed keys

private void processKeys(Set keys) {
        // the builder builds the index from the content retrieved using the passed in keys.
        final IndexBuilder builder = indexBuilderFactory.createBuilder(keys);
        
            
          executorCompletionService.submit(new Callable() {
          @Override  
          public BuildResults call() throws Exception {
              BuildResults buildResults = null; 
              try {
                  // buildResults contains all information I need to post process the task.
                  buildResults = builder.build();
              } catch (Exception e) {
                  throw e; // caught by ErrorCapturingExecutor
              }
            
              return buildResults;
          }

        
        });
        
    }

Wednesday, September 9, 2009

Using ThreadLocal to pass in dummy components

Synchronization of data access across multiple threads is tricky.  While Java's threading primitives are fairly easy to understand and use, there can be unintended performance consequences of making an object thread safe. Depending on how objects synchronize other objects, you can also end up with deadlocks that are no fun to debug. For example if object1 tries to lock object2 while object2 is trying to lock object1, you're in for a long night.

In general, anything that reduces synchronization of data across threads reduces the potential for unintended consequences. An alternative to making an object threadsafe is to make it thread-local. Thread local objects provide a separate copy of themselves for all threads. Each thread can only see it's local instance of that object, and is free to modify it at will, without needing to synchronize.

Thread-local variables used to have significant performance issues, and there have been bugs in previous to 1.6 versions. Also, it is possible to easily run out of memory with large numbers of threads using large thread-local objects. But assuming you go in with your eyes open, reducing synchronization across the threads in your application is good for performance and can significantly reduce complexity.

Another benefit of thread-local variables (as if simplification and performance gains aren't enough) is that they make it easy to swap in stub components at unit test time. Why would you do this instead of passing in the component? I ended up using thread-local variables for my components when I had to instantiate an object via the Class.forName() method, and didnt know/want to know about how to wire up dependent components. It's a trick I want to remember (so I'm writing it down :)

I implement the component as a thread-local variable is via an anonymous inner class:

ThreadLocal sequenceClientLocal = new ThreadLocal() {
        @Override
        protected SequenceClient initialValue() {
            SequenceClient sequenceClient =  null;
            try {
                sequenceClient = SequenceClientImpl.getInstance(hosts,hexIdNode);
            } catch (Exception e) {
                sequenceClient = null;
            }
            return sequenceClient;
        }
    };
    

In order to swap this default value out for a stub file, I add a setter to override it:

public void setSequenceClientLocal(ThreadLocal sequenceClientLocal) {
        this.sequenceClientLocal = sequenceClientLocal;
    }

At unit test time, I can stub in a dummy class by calling the setter:

public class TestCollapserAgent {

  @Before
  public void setUp() throws Exception {

    sequenceClient = new DummySequenceClientImpl(1);

    collapserAgent.setSequenceClientLocal(new ThreadLocal() {
            @Override
            protected SequenceClient initialValue() {
                return sequenceClient;
            }
        });
    ....
  }

  // unit tests follow....

}

Thursday, September 3, 2009

The Ratio of Ceremony vs Essence, aka Framework Debt

This is going to be the kind of post I had sworn off of: a lot of opinion, with some rambling mixed in. I apologize in advance for that, but occasionally I need to vent about things that deeply disturb me, and venting tends to be nonlinear.

I just spent the last week trying to work with a legacy system component that was implemented using the Spring framework. This component read data from a database into a Lucene index wrapped by Compass. At the time of implementation, the lead engineer was using JPA, to load database records into POJOs, which he then annotated so that they could be serialized via JAXB, which enabled Compass to read them in as Lucene Documents. Whew!

Because time was limited and the code was already in production, I decided to ignore my fundamental misgivings about frameworks and Java Acronyms, and make the minimal modifications to the existing source that would get it to take input from S3 instead of a database.

After a day of struggle, I had figured out what was going on, and was astounded by the amount of code required just to set up the relatively simple business logic. When I hit a 'schema not found' error trying to load the application.xml, I gave up, ripped out the business logic, and re-implemented the entire thing in a matter of hours. With a lot less code. I know that the original implementation of the Spring based code took a week or so to write.

The massive increase in efficiency is not because I'm a brilliant coder. I wish I was, but I've worked with brilliant coders and I'm not one of them. It's because the actual business logic was pretty minimal. The logic required to implement and maintain the Spring application required a lot of code that could only be described as Ceremonial, as opposed to Essential business logic. I first read about Ceremonial vs Essential code here, the night after I had exorcised Spring from the logic. The timing couldn't have been more appropriate.

What is Ceremonial code? It is code that has nothing to do with implementing a business requirement. In Spring, I define Ceremonial code as:
  1. Configuration as code
  2. Dependency Injection
  3. (Pedantic use of) Interfaces
The three examples above are not terribly bad, in fact they come from decent intentions ("the road to hell..."). But put together they have an exponentially bad effect. They are, when added to a developer's blind belief in the goodness of all things Frameworky, the Four Horsemen of the (Framework) Apocalypse.

Configuration As Code


Separating configuration into a data file is inherently a good idea. You don't want to hardcode variables that you would then have to rebuild the application to change. I'm not sure how this basically sound idea warped into "hey, let's put EVERYTHING into configuration", but the biggest problem with this approach is that now part of the logic is in code, the other part is in a massive XML file. You need both to understand the control flow of the application, so  you spend a lot of time toggling back and forth, saying "What class is being called? Oh, let me check in xml configuration. Oh, that's the class. Great. What was I doing?" Maybe some people see this kind of rapid mental stack management as interesting and novel brain training. I see it as wasting time, time that I could be spending either coding or testing a feature that someone is paying me to implement.

Dependency Injection


This too, starts off as a great idea. One of the big issues people (OK, I'm talking about myself, but using the word 'people' to get some more legitimacy) had with EJB 2.0 code was that it was really hard to test. You had to have the whole stack up and running, and validating the integrity of different components in the stack was so hard we just didnt do it.

Dependency Injection/Inversion of Control allows you to parameterize the components of an object, making that object really easy to test. Just expose components with getters and setters, and you can dummy them up and test that object in true isolation! Again, there is still nothing really flawed at this point.

The major flaw in Dependency Injection comes at implementation.  Objects need all of their components in a known, initialized state, in order to function effectively. Dependency Injection as implemented in Spring is usually done in the configuration file. Objects that are created in the configuration file  have all of their components set in their configuration.

It is very easy to miss setting a component in the configuration file. This means that the object will initialize in a bad state that becomes apparent when you try to use it. People use constructors because they can specify components as parameters to the constructor, which is an explicit way of saying "This component needs components X, Y, and Z to run".

Using a constructor provides a foolproof way to successfully initialize an object without having to test for initialization success. If the constructor returns, you're good. If not, you know that the object is not usable.

In order to be able to be configurable via Spring, objects must (a) have a default (no argument)  public constructor and expose all of their required components via setters. There is no way to enforce that setup has been correct, so the developer has to spend time looking at the getters and setters of the object to determine what components they need to supply at configuration time. When I compare that effort to the effort of looking at the constructor parameters, it feels very inefficient.

Pedantic Use of Interfaces


The goal of the Java Interface is (a) separate functionality from initialization, and (b) provide a contract that a caller and callee can communicate across. This makes sense in the following two cases:
  1. You have a complex object and you only want to expose a part of it to a caller. For example you have a parent class and you want to expose a callback interface to the child class.
  2. You have multiple implementations of the same functionality and you don't want the caller to care about which object they are calling. 
What I see all over Java-Land, and especially in Spring, is interfaces being used because someone got pedantic about separating functionality from initialization. I fail to see the use of an interface when used to abstract the implementation of all of the methods of a single class. You're writing two structures when one could do the job just fine. Actually, you end up writing three structures: the interface, the implementation, and a factory object, which is more ceremonial code. Even if you need the interface, you could still have the implementation object return an instance of itself cast to the interface via a static initialization method:

public class S3AccessorImpl implements S3Accessor {

    private static final int DEFAULT_SET_SIZE = 1000;
    private S3Service service;
    private Logger logger;

    public static S3Accessor getInstance(AWSCredentials creds) throws S3ServiceException {
        return new S3AccessorImpl(creds);
    }
    
    
    protected S3AccessorImpl(AWSCredentials creds) throws S3ServiceException {
        logger = Logger.getLogger(this.getClass());
        service = new RestS3Service(creds);

    }
    ...
}

In spite of my comments above, I am a fan of using interfaces as the boundaries between an components because it facilitates easier unit testing. But I'm not entirely sold on abstracting the creation of an object to a Factory that returns the interface that object implements -- not when the above method (a) hides creation from the caller and (b) doesn't require an extra class with a single 'createFoo' method.

Also, I don't understand always writing interfaces first, then implementation classes second. I tend to implement classes until I have a real need for an interface, i.e. during unit testing when I am going to submit a 'dummy' component in place of a real one. 

Conclusion


My recent experience with Spring has reminded me of the existence of 'Framework Debt'. Framework Debt is the Technical Debt required to implement a solution with a given Framework. In short it is determined by the ratio of time spent writing  and maintaining ceremonial code vs the amount of time spent writing and maintaining essential business code. The problem with most frameworks, Spring included, is that they do not distinguish between ceremonial and essential code, because to them, it's _all_ essential code. And, to work in that particular framework, ceremonial code is absolutely essential, and having to maintain and understand a bunch of logic that has nothing to do with application functionality seems inherently wrong to me.

I actually do like some frameworks I've run into. Rails is great because of it's 'convention over configuration', but that is another kind of technical debt. Fortunately it is pretty low in Rails, and as a result applications can be rapidly developed in Rails without losing maintainability. But even Rails feels too heavy for me at times.  I do write apps that don't need the overhead of MVC. For these apps, Sinatra allows me to quickly get path routing out of the way and concentrate on the underlying code.

Thursday, August 27, 2009

Zookeeper and Concurrency

Recently I ran into a problem that seemed to require more of a solution than I was willing to implement. We are currently migrating an application from using a single worker, single database to having multiple workers running in parallel using s3 as the primary means of storage. (Side note: this migration is only possible because the application doesn't actually require any of the unplanned, interactive queries that only a database is good at. The choice of a database as the persistence mechanism for this application was not a popular one, and only grew less popular as more time was spent putting out database related fires than implementing new features). One of the legacy requirements of the system, which supports an in production website, was that the IDs for all new items had to be distinct integers.

Without this legacy requirement I would have put in some kind of GUID scheme and called it an (early) day. The existence of previous items with IDs that other services relied upon made a GUID generation scheme not possible. However the requirement of distinct integers requires coordination between the agents, who would need to make sure they are not creating the same ID for different objects.

My initial thought was to implement a simple service that provided an integer that it would auto increment with every request. The big problem with this approach is that the service would be a massive, non redundant bottleneck unless it too was replicated, and then it would be faced with the same problem that the original workers faced wrt keeping integers synchronized across different processes.

So I had a bad feeling about starting down that path, and was putting it off, when a colleague at work suggested that I check out Zookeeper. Zookeeper, was created by Yahoo research specifically to solve the kind of synchronization problems that I was having, in a highly performant, fault tolerant way. In other words, this was the service that I was trying not to write :)

Zookeeper at 10000 feet consists of multiple services that maintain a hierarchical namespace consisting of nodes that can have child nodes. Each node can have associated data, limited to under 1MB, meant to be used for coordination/synchronization.

Zookeeper is in the words of its creators, "needed to be general enough to address our coordination needs and simple enough to implement a correct high performance service. We found that we were able to achieve {this} by trading strong synchronization for strong ordering guarantees and a wait-free interface."

What that means is that node access is 'no wait', meaning when you ask for data you get it, but you do not have an exclusive lock on the data. This is quite different than the mutex based locking model that I'm used to, and at first I didn't see how I could use this to guarantee unique IDs to multiple agents creating multiple items without getting an exclusive lock on the data and making a modification.

What I didn't get (until another colleague walked me through some of his code) is that any and all changes to the data are taken, and versioned. When I request data, I get back an object corresponding to the version of that data. When I submit data, I can specify that the submit will only succeed if the data (and therefore it's version) hasn't been updated from the version that I have. So if I get node data that is an integer ID, increment it, and try to update the data it back, two things can happen (excluding connection loss, which must also be dealt with):
  1. I can return successfully, meaning that my change was accepted because no one else had made changes since I had retrieved the data.
  2. I can get a Bad Version exception, which means I need to get the data again, and try to re-increment the new value.
The code below shows the method that requests, recieves, and attempts to increment the data:


public String getNextId() throws Exception {
ZooKeeper zk = null;
String hexId = null;

boolean keepGoing = true;

while(keepGoing == true) {
try {
Stat nodeStat = new Stat();
Stat setDataStat = null;
zk = getZooKeeper();
byte data[] = getDataWithRetries(zk,
sequenceName,
nodeStat);
ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);


try {
setDataStat = setDataWithRetries(
zk,sequenceName,
buf.array(),nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.BADVERSION)) {
nodeStat = setDataStat;
}

}
} finally {

// always need to close out the session!
zk.close();
}

}


return hexId;
}

I've wrapped calls to zookeeper with a getZookeeper() method, and pass the retrieved Zookeeper instance into two methods: getDataWithRetries(), and setDataWithRetries(). Both methods try to recover from connection losses as best they can.

The getDataWithRetries method takes a Zookeeper instance, the path to the node being accessed, and a Stat structure that will contain retrieved data version information. It returns the retrieved data in a byte array. Note how in this method I'm only going to recover from connection losses, because this is a read operation.


protected byte[] getDataWithRetries(
ZooKeeper zooKeeper,
String path,
Stat nodeStat) throws Exception{


byte data[] = null;

int i = 0;

while(i < RETRY_COUNT) {
try {
i++;
data = zooKeeper.getData(path,
false,
nodeStat);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.NODEEXISTS))
{
break;
}
else {
throw e;
}
}
}
if(i >= RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return data;

}


Once I have the data, I increment it. Note that Zookeeper data is always kept as a byte array, so I convert it in order to increment it:

ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);

and then try to resubmit it back to Zookeeper. This is where things get interesting. If someone has modified the data before I could get back to it, I need to get the new value of the data and try again. In the setDataWithRetries() method below, I only handle connection exceptions, and blow out if there is a BADVERSION exception:



protected Stat setDataWithRetries(
ZooKeeper zooKeeper,
String path,
byte data[],
Stat stat) throws Exception{

int i = 0;
Stat statFromSet = null;

while(i < RETRY_COUNT)
try {
i++;
statFromSet = zooKeeper.setData(path,
data,
stat.getVersion());
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.BADVERSION))
{
// differentiating for debug purposes
throw e;
}
else {
throw e;
}
}
}

if(i > RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return statFromSet;

}

The calling code of setDataWithRetries() handles the BADVERSION exception by getting the data again, and retrying the submit:

try {
setDataStat = setDataWithRetries(zk,
sequenceName,
buf.array(),
nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(!e.code().equals(Code.BADVERSION))
{
throw e;
}
}


So each agent tries to get an ID until they succeed, at which point they know they've got a unique one.

The thing I really like about the strong versioning and ordering approach, now that I understand it, is that it acknowledges concurrency and makes it easy to deal with. Locking, on the other hand, seems like an attempt to abstract away the concurrency by enforcing serialization, which works OK when you are managing machine or process local resources, but can have huge performance impacts when you are trying to synchronize access across multiple machines.

The next thing I'm considering using Zookeeper for is configuration changes. Right now I push configuration changes out to my worker nodes by hand and force a restart via their web service interface. I would like them to be able to reload themselves automatically when state changes. This is a step up from the simple code detailed in this post, it means I need to use Zookeepers notification capabilities to alert listening processes when the configuration changes.