Monday, December 21, 2009

Impedance Mismatch

Post Mortem:

Long story short: every time I've had a project fail, one recurring theme has been bad technology choices. There have been others, like group dysfunctionality aka 'collective asshat fatigue', where the entire group stops functioning to avoid dealing with one or more aberrant personalities. Bad project scoping/definition also contribute to failure rates, but I don't think that there is much intersection between groundbreaking work and the prototypical well defined, well understood project.  So it would seem that issues of scoping as well as team dynamics are  (a) exacerbated by and therefore (b) secondary to the bad technology choices that put the project in jeopardy.

I thought I had gotten better at detecting when I was making a bad technology choice, but I recently made one. Fortunately we were able to turn things around, but it was hard.  In the interest of not making this class of bad decision again -- because the definition of insanity is to do the same thing and expect different results --  I want to dissect what went wrong, what went right, and what I learned.

The Choice:

I recently started a new job. My first task was to jump in and assist on a prototype project by writing feed parsers that would parse millions of rows of comma separated values from feeds into various records in an SQL database. I was initially constrained to using Java.  I took a standard object=row approach persisting single objects at a time.

The code was tdd, separation of concerns was good, all unit tests passed (coverage was good). Several hundred lines of code were required to parse, clean, validate, and insert data from various feeds into the database. I did not use an ORM, I used straight SQL via JDBC.

The Results:

The performance was horrendous. Inserting several million rows took hours, hours that we simply didn't have. The performance impacted the effectiveness of every down the line operation, and was jeopardizing the success of the overall project. Not a good way to start a new job :)

The Workaround:

We ended up ditching Java completely and rely on existing unix commandline tools to parse the files, insert into temp tables, and do bulk updates/inserts of rows from those temp tables into the main, 'canonical' tables.   In other words, the 100s of lines of java parsing and insertion code that I wrote in a week or so (counting unit testing) and frantically reworked several times to try and speed up was replaced by something like this:

cat rawfile.csv | cut -d, -f1 | tr ":upper:" ":lower:" | sed -e"s/^m//g" | sort | uniq > psql -c "copy tablename from stdin using delimiters ','" | actual_queries.pl


This took a multi hour query down to 5 minutes. There was a bunch of pre-formatting prior to inserting into the database, and a perl script that ran afterwards, using DBI to copy/update from the temp table.

In general, the one rule that emerged was 'do as much processing before going to the db'. For example, determining set exclusion/intersection, which is something I would have definitely gone to code or SQL for, could be done via commandline via the comm utility:

comm -12 <(sort file1) <(sort file2) gives the intersection of file1 and file2.
comm -13 <(sort file1) <(sort file2) gives unique lines from file2
etc.

Conclusions:

Conclusion 1: Tests Still Required.

The good thing about piping a bunch of common unix tools together is that they have been around for a long, long time. Meaning you don't have to worry about the integrity of the data as much as you have to worry about using the tool options correctly. The bad thing about this approach is that the only kind of testing is integration testing, and it is easy to blow off when the initial solution works (or seems to).

After getting bitten when the queries worked but the data had integrity issues that manifested in the logic,  we ended up writing a bunch of scripts that verified data integrity by making queries and inspecting result sets. We also leveraged the database, adding constraints that would allow the script to fail fast and alert us to schematic integrity issues, like duplicate rows.

Conclusion 2: It's Not the Databases Fault.

The database is a very convenient scapegoat, but the truth  is that I spoon fed data into the database, and I could only move as fast as I could move my spoon (in Java). The better approach is to bulk feed data into the database, via bulk copies and bulk inserts/updates. Again, verification/validation scripts and constraints are required.

Conclusion 3: SQL Good, ORM Bad.

The truth is that we could have done this in Java, had we just used the same SQL we ended up using in the Workaround. My mistake when using Java was to put on my ORM blinders, which are great for when I want to pretend that there is some arbitrary data store underneath my code. This works until it doesnt, usually at 12AM the day of a release.

Multiple FAILS mean I'm done pretending the database is some fuzzy abstract data 'store', because I will use one when I want to want to mine data along arbitrary axes -- in other words, I'll use a database precisely to use SQL and not some mapping to it.  SQL is a mature and extremely powerful way to ask open ended questions of a schema.  If I don't want to ask open ended questions of my data, I shouldn't use a database. Because that's what they're built for.  BTW I haven't used Hive or Pig yet, but these seem to be the QL solutions for much larger datasets than the one I was working with.

Conclusion 4: When in Doubt, Go Cheap, Go Fast.

However, just because we could have done it in Java doesn't mean we should have. Perl or Ruby or Python or Bash and the plethora of solid utilities available will now always be my first option when putting together a data input operation at this particular scale.

I think there will always be those opportunities that present themselves as vaguely defined chances to hit it big. Instead of taking lots of time up front to define the work involved at the expense of the actual opportunity, I'm going to move ahead with cheap and fast technologies that let me change path extremely quickly, because I'm sure I will need to at least once during the course of an ill defined project.

Conclusion 5: Keep the blinders off!

This entire experience was a huge reminder to me to be open minded about choosing the right tool for the job. This was an instance where I let the technology choice mandate my implementation decisions, instead of the other way around. Every time I do that, I get screwed. If, instead of putting my head down and running as fast as I could,  I had initially asked questions about the duration of the project, the intention of the code, the performance constraints on data input, etc, I could have easily justified the use of Perl/unix tools/raw SQL, and saved a lot of late nights/coding angst.

Conclusion 6: It's The People, Stupid

One thing that overwhelmingly shone through even in the grimmest of moments was the quality and class of the people I was working with. They all stayed completely focused on the solution, and did not point any fingers even when to do so would have been more than understandable. Furthermore, they were able to keep their sense of humor intact. While I didn't necessarily enjoy making this big of a mistake at a new job, the level of teamwork, professionalism, and respect from my new co-workers was complete confirmation of my reasons for jumping ship from my old company.

Thursday, October 15, 2009

Using Zookeeper to Implement a Redundant Persistence Service

Preface

I've previously detailed how we use Zookeeper to generate unique sequenceIDs for items created by a pool of worker processes. Zookeeper is the ideal solution for this problem because of its strict order/version enforcing that allows completely non-blocking access to data, it's redundancy, and its easy to understand node and data paradigm.

Our current approach has multiple clients (one per worker process) requesting the latest sequence ID from a Zookeeper node. These sequence IDs are critical to the integrity of our data: they cannot be duplicated. Using Zookeeper version information, multiple clients request values and try to update those values with a specific version. This update will fail if someone else has updated the version in the meantime, so the client handles that failure, gets the latest value, increments it, and tries to update again. Again this is a non-blocking approach to updating a value and so there is no system level bottleneck.

Redundancy Required

In order to make our system truly redundant, we need to account for what happens if all zookeeper instances went offline by persisting the latest generated sequence IDs -- again we absolutely need to not duplicate IDs to maintain system integrity.  When we persist sequence IDs,  it is possible to restart all zookeeper instances and have them pick up where they left off. Note that we can reduce the amount of persisting needed by 'reserving' a future ID, persisting it, and only modifying it when the generated IDs actually get to that value. In other words, persist ID 100, and update that value to 200 when the system generates ID = 200. This maintains ID uniqueness across system restarts at the loss of 100 values, which is a decent tradeoff.

Persistence via Watches

The simplest implementation of a persistence service takes advantage of Zookeepers watch functionality, which lets a client register for notifications when a node goes away or its value changes. The client gets notified every time a watched value changes, and receives an Event object with the details of the change. In our case, the client is a Persistence Service, which retrieves the location of the updated data from the Event object, retrieves the data, and determines whether it needs to reserve a future block of numbers as described above. Note that Zookeeper Watches are defined to be one time triggers, so it is necessary to reset the watch if you want to keep receiving notifications about the data of a specific node, or the children of a specific node.

You watch data by registering a Watcher callback interface with Zookeeper. The Watcher interface implements the process() method, which handles the WatchedEvent parameter. The following process() method is determines when to persist the next reserved value.

public void process(WatchedEvent event) {
        Stat stat = new Stat();
        
        EventType type = event.getType();
        String node = event.getPath();
        if(type.equals(EventType.NodeDataChanged)) {
            
            try {
                
                byte inData[] = getData(sessionZooKeeper,event.getPath(),stat);
                currentSequenceID = SequenceHelper.longFromByteArray(inData);
                
                if(currentSequenceID % RESERVE_AMOUNT == 0) {
                    persistSequenceID(node,currentSequenceID+RESERVE_AMOUNT);
                }
                
            } catch (Exception e) {
                logger.error(e);
            }
            
        }
        else if(type.equals(EventType.NodeDeleted)) {
            logger.error("data node "+event.getPath()+" was deleted");
        }
        
        // every time you process a watch you need to re-register for the next one. 
        try {
            addWatch(sessionZooKeeper,this,node);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
    }

In this case, I point Zookeeper to the Watcher object when I retrieve a ZooKeeper instance:

public ZooKeeper getZooKeeper() throws Exception {
        
        return new ZooKeeper(hostString,DEFAULT_TIMEOUT,watcher);
    }

Watchers, as is implied above, are bound to the lifecycle of the Zookeeper object they are passed into. Watchers can also be established when checking to see if a node exists, or when retrieving children of a node, because the client may want to be notified if a node is created or if children of a node are created.

Redundancy and Leader Election/Succession

Of course, a persistence service is useless if it is not redundant, especially if the integrity of our data requires us to persist reserved sequence IDs.  We only want one process at a time persisting sequence IDs. If that process goes down, we want another process to step in immediately.  In other words, we want a single leader to be selected from a group of 2 or more independent processes, and we want immediate succession if that leader were to go away.

In order to handle Leader Election and Leader Succession, the multiple persistence processes create and watch  Sequential-Ephemeral nodes. Sequential-Ephemeral nodes have the following properties:
  1. They automatically increment if there is a previous numbered node.
  2. They go away if the ZooKeeper instance  that created them goes away. 
The persistence processes use these two properties when they start up:
  1. They create a Sequential-Ephemeral node (note, this requires that they have a ZooKeeper instance open for as long as they are alive, so that the node sticks around).
  2. They check to see if there is a lower numbered Sequential-Ephemeral node.
  3. If not, they are the leader, and they register to watch for changes on the nodes used to track sequence IDs.
  4. If there is a lower numbered sequential-ephemeral node, they register to watch that node. Specifically, they want to get notified if that node goes away.
  5. They only want to watch the nearest lower numbered node. This avoids a 'swarm' that would happen if all lower nodes watched the single leader node. So succession is guaranteed to be orderly and only done by one node at a time.
public synchronized void startListening() throws Exception {
       
       // create a persistent node to store Sequential-Ephemerals under.
       if(checkExists(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE) == false) {
           createNode(sessionZooKeeper, SEQUENCE_MAINTAINER_LEADER_NODE, null,
              CreateMode.PERSISTENT);
       }
       
       // sequential nodes end in /n_

       String path = SEQUENCE_MAINTAINER_LEADER_NODE+PREFIX;
       createdNode= createNode(sessionZooKeeper, path, null,
           CreateMode.EPHEMERAL_SEQUENTIAL);
       
       // this method transforms the sequential node string to a number
       // for easy comparision
       int sequence = sequenceNum(createdNode);
       
       // only watch the sequence if we are the lowest node.
       if(doesLowerNodeExist(sequence) == false) {
           logger.debug("this node is the primary node");
           isPrimary = true;
           loadSequenceMaintainers();
       }
       else 
       {
           // this node is a backup, watch the next node for failure.
           isPrimary = false;
           
           watchedNode = getNextLowestNode(sequence);
           if(watchedNode != null) {
               logger.info("this node is not primary, it is watching "+watchedNode);
               boolean added = super.addWatch(sessionZooKeeper,this,watchedNode);
               if(added == false) {
                   throw new SequenceIDDoesntExist(watchedNode);
               }
           }
           else {
               throw new SequenceIDDoesntExist(watchedNode);
           }
       }
       
       isListening = true;
    }

In the code above, I've abstracted a lot of the details of interacting with Zookeeper under methods. Hopefully the method names make it clear what I'm doing. There is enough specific documentation about the ZooKeeper API, I'm more interested in showing the logic behind electing a leader.

The code above shows that only one process will be the leader. The other processes will be watching the next lowest node, waiting for it to fail. This watching, of course, is done via the Watcher::process() method:

public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        
        if((path != null && path.equals(watchedNode)) && (type != null &&
           type.equals(EventType.NodeDeleted))) {
            
            try {
                int watchSequence = this.sequenceNum(watchedNode);
                if(this.doesLowerNodeExist(watchSequence) == false) {
                    logger.debug("watcher of "+watchedNode+" is now primary");
                    isPrimary = true;
                    watchedNode = null;
                    // now you are the leader! the previous leader has gone away.
                    // note you are no longer watching anyone, so no need to
                    // re-register the watch.
                    loadSequenceMaintainers();
                }
                else {
                    logger.debug("watcher of "+watchedNode+" is not yet primary");
                    // there is lower node that is not the leader. 
                    // so watch it instead. 
                    watchedNode = getNextLowestNode(watchSequence);
                    boolean success = addWatch(sessionZooKeeper, this, watchedNode);
                    if(success == false) {
                        throw new SequenceIDDoesntExist(watchedNode));
                    }
                }
            } catch (Exception e) {
                // fail fast for now
                logger.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
         ......
    }

Note that if an intermediate process fails, the 'watching' process then gets the next available lowest node to watch and watches it. If all intermediate processes fail, the watching process becomes the leader.

Dynamic Node Detection

Once these long lived services are up and running, I don't want to have to restart them if I am adding another sequence node to be tracked. We do this all of the time, because we are running multiple instances of the data pipeline and tracking sequences across all of them.
This again can be handled by setting a watch, this time on the children of a top level node, and restricting sequence node creation to directly underneath that node. In other words, have a zookeeper node called /all-sequences and stick sequence-1...sequence-N underneath it. We set the watch on the node when we check to see if it has children:
children = zooKeeper.getChildren(sequenceParentNode, true);
This registers the class that created the zooKeeper instance as the Watcher. In the process() handler, we detect whether children have been deleted or added. Unfortunately, we can only detect if children underneath a node have changed, so it is up to us to determine which ones have been deleted and which ones have been added:
public void process(WatchedEvent event) {
        
        EventType type = event.getType();
        String path = event.getPath();
        ....
        if(path != null && path.startsWith(sequenceParentNode) && (type != null)) {
            // getting this notification implies that you are a primary b/c that 
            // is the only way to register for it. 
            try {               
                if(type.equals(EventType.NodeChildrenChanged)) {
                    // figure out missing and new nodes (expensive, but this 
                    // only happens when a node is manually added)
                    List newSequences = new ArrayList();
                    List missing = new ArrayList();
                    List children = this.getChildren(sessionZooKeeper,
                        sequenceParentNode,false);
                    for(String child : children) {
                        if(this.sequenceMaintainers.containsKey(child) == false) {
                            newSequences.add(child);
                        }
                    }
                    
                    for(String currentSequence : sequenceMaintainers.keySet()) {
                        if(children.contains(currentSequence) == false) {
                            missing.add(currentSequence);
                        }
                    }
                    
                    // add new sequences to watch list
                    for(String child : newSequences) {
                        String sequencePath = sequenceParentNode+"/"+child;
                        sequenceMaintainers.put(child,
                           new SequenceMaintainer(s3Accessor,
                             hosts,sequencePath,true));
                    }
                    
                    for(String child : missing) {
                        sequenceMaintainers.remove(child);
                    }
                    
                }
                
                boolean success = addWatch(sessionZooKeeper, this, 
                   sequenceParentNode);
                if(success == false) {
                    throw new SequenceIDDoesntExist(sequenceParentNode));
                }
            } catch (Exception e) {
                // fail fast for now
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

Conclusion

I'm pretty amazed how much I was able to leverage two main points of functionality -- nodes and watches -- to build a persistent sequence watching service. Once again it seems that picking the right primitives is what makes ZooKeeper so practical for distributed synchronization.

Thursday, October 1, 2009

Quick Webserver setup with Jersey and Jetty

No More Hand Rolling

In our data pipeline, we have different components that we communicate with via web services.  In the beginning, there were only three commands needed: pause, restart, and reload. So I wrote a quick Servlet, loaded up embedded Jetty, and called it good. The Servlet contained some method handling code to parse path strings:

String path = request.getPathInfo();
        
        if(path.equals(ROOT)) {
            response.setContentType("text/html");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println("name");
            response.getWriter().println("getInfo()");
        }
        else if(path.equals(STATS)) {
            response.setContentType("text/json");
            response.setStatus(HttpServletResponse.SC_OK);
            
            JSONObject responseObject = new JSONObject();
            responseObject.put("service", name);
            responseObject.put("status",status.toString().toLowerCase());
            responseObject.put("statistics", jobStatsAnalyzer.toJSON());
            response.getWriter().println(responseObject);
        }
        else if(path.startsWith(SERVICE_PREFIX)) {
            response.setContentType("text/html");
            
            responseCode =  
              processServiceRequest(path.substring(SERVICE_PREFIX.length(),
                path.length()),response);
            response.setStatus(responseCode);
            
        }
        else {
           ......
        }
And the processServiceRequest call is equally complex because it has to parse the next section of the path. Still, because there were only three methods and it took little time to code up, I felt fine about hand rolling the Servlet, even through there was a lot of boilerplate path parsing code.

One of our components now needs (much) more path handling. It is a validation component -- it collects transformation exceptions thrown by the view processing components. Those exceptions are dumped to an SQS queue, picked up by the validator, and dumped into a Lucene index that allows us to query for exceptions by various exception metadata. The validator needs to expose a more complex Rest Interface  that allows a data curator to find exception (resources) by that various metadata (sub resources), i.e. by exception type. They can then fix the the root cause of the exceptions, and validate that the exceptions go away via a set of scripts that query the validator web service.

One option to extend the current web service functionality would be to subclass the custom Servlet, but that's a lot more boilerplate code and I know that we are probably going to need to extend another component in another way, which would mean more code. More code to debug, more code to maintain, more code to understand.

Jersey aka JAX-RS aka JSR 311 allows you to compose restful services using annotations.  It is an alternative to hand rolling servlets that lets you declaratively bind REST methods (GET/PUT/POST/etc), paths, and handler functions. It handles serializing data from POJOs to XML/JSON and vice versa. I had been wanting to check it out for some time now, but simply didn't have a concrete need to do so.

Jersey And Jetty

I decided to stick with Jetty as my servlet container because launching an embedded instance was so brain dead. But I decided to use the Jersey servlet and see how hard it would be to re-implement my hand rolled servlet. The way to bind the Jersey Servlet to Jetty uses Jetty's ServletHolder class to instantiate the Jersey servlet and initialize it's annotation location as well as the location of the resources it is going to use to handle web requests. The code below shows how the Jetty ServletHolder is initalized with the Jersey ServletContainer (which actually implements the standard Servlet interface) and then bound to a context that allows the ServletContainer to handle all requests to the server.
sh = new ServletHolder(ServletContainer.class);
        
sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", RESOURCE_CONFIG);
sh.setInitParameter("com.sun.jersey.config.property.packages", handlerPackageLocation);
        
server = new Server(port);
        
Context context = new Context(server, "/", Context.SESSIONS);
context.addServlet(sh, "/*");
server.start();

The com.sun.jersey.config.property.packages parameter points to the location of the Jersey annotated resources that the Jersey ServletContainer uses when handling requests. Those resources are simply POJOs (Plain Old Java Objects) marked up with Jersey annotations.

Jersey Resources

In order to parse a specific path, you create and object and use the @Path annotation. A method in the POJO is bound to that path by default. You can also parse subpaths by binding them to other methods via the @Path annotation. Here is an example:
@Path("/")
public class DefaultMasterDaemonService {

        
    private ServiceHandler serviceHandler;


    // this one handles root requests
    @GET
    @Produces("text/plain")
    public String getInformation() {
        return serviceHandler.getInfo();
    }
    
    // this one handles /stats requests
    @GET
    @Path("/stats")
    @Produces("application/json")
    public DaemonStatus getStatus() {
        return serviceHandler.getStatus();
        
    }
    .....
}

Basic Annotations

There are a couple of annotations above worth discussing in addition to the @Path annotation.
The HTTP method that is bound to the POJO method is specified via the @GET, @POST, @PUT, @DELETE, and @HEAD annotations.
The returned content Mime type is specified with the @Produces annotation. In the example above, a request to the root path returns some informational text, and a request to /stats returns JSON.

Returning JSON and XML

In order to return JSON/XML, you need to leverage JAXB annotations to make your data objects serializable to JSON/XML. Note: remember to always include a default constructor on your data objects. Otherwise you get exceptions trying to serialize those objects.

I also found that unless I did _not_ declare getters and setters, I would also get serialization errors. I had not seen this before, and therefore assume that it is something specific to Jersey Serialization.

Here is an example of a JAXB annotated object that I use to return Status:
@XmlRootElement()
public class DaemonStatus {
    // apparently methods to access these are added at serialization time??
    @XmlElement
    public String serviceName;
    @XmlElement
    public String status;
    @XmlElement
    public JobStatsData jobStatsData;
    
    // need this one!
    public DaemonStatus() {
        
    }
    
    public DaemonStatus(String serviceName,String status,JobStatsData jobStatsData) {
        this.serviceName = serviceName;
        this.status = status;
        this.jobStatsData = jobStatsData;
        
    }
}
So all I needed to do to get JSON/XML in my return type was to create a JAXB annotated object, and specify what I wanted the method to produce via the Jersey @Produces annotation. Less code = more fun!

Parameterizing Path Components

Our components have Pause/Restart/Reload functionality accessible via the http://host/services/{pause|restart|reload} path, using POST. Jersey lets me parameterize the last part of the path, which makes the syntax of the command explicit while allowing me to only code string matching for the parameterized part:

@POST
@Path("/service/{action}")
public void doAction(@PathParam("action") String action) throws Exception {
        
  if(action.equals(MasterDaemon.PAUSE)) {
    serviceHandler.pause();
  }
  else if(action.equals(MasterDaemon.RELOAD)) {
    serviceHandler.reload();
  }
  else if(action.equals(MasterDaemon.RESUME)) {
    serviceHandler.resume();
  }
  else {
    throw new Exception("No such action supported: "+action);
  }
}
I've delegated the real meat of the action to a serviceHandler component, but this kind of path handling is about as easy as it gets. Note that the action parameter is specified via the @PathParam annotation directly in the method argument list.

Conclusion

I only really scratched the surface of what Jersey can do. In my case I don't have to parse query parameters, but that is easily done by specifying a @QueryParam argument to the handler method in the same way I specified the @PathParam. From what I've been able to understand, you can only access query params as strings (but that's pretty reasonable).

I really liked how quickly I was able to toss out my hand coded servlet and trade up to the Jersey one. Other people on the team were able to wire up rich REST interfaces on several components almost immediately, which let all of us go back to focusing on real requirements.

I usually 'cast a jaundiced eye' towards anything that even has a hint of framework in it, but Jersey was super fast to learn and using it instead of hand coded servlets has already saved us a lot of time and finger strain.

Thursday, September 10, 2009

Using ExecutorCompletionService to synchronize multithreaded workflow

Today I ran into a problem where I needed to make sure that one multithreaded phase of processing had completely ended before starting another. Specifically, I was retrieving documents from S3 to load into a lucene index, and wanted to retry all document requests that had failed due to S3 flakiness, connectivity issues, i.e. standard distributed computing error conditions.

In other situations requiring synchronization between threads, I've used a CountDownLatch. This works really well when you know the exact number of threads that you need to synchronize. You initialize the latch with the number of threads that you are synchronizing. When they finish work they decrement the latch, when the latch count goes to 0 you continue processing.

This time was different because instead of synchronizing threads,  I was trying to halt processing until all asynchronous submitted tasks had completed processing. I was queuing up several hundred thousand tasks into a thread pool, and did not know when that thread pool would be finished with the work, or how many threads would be running when the entire job completed, or even exactly how many tasks I had to run -- that number  depended on the number of documents being fetched, which is always growing.

Fortunately my situation was not a unique one.  I figured that the first place to look was the java concurrency library, and when I did some research, I found that ExecutorCompletionService was exactly what I needed.

ExecutorCompletionService works with a supplied Executor using Runnable/Callable tasks. I decided to use Callable tasks, as they allowed me to return and inspect a value, and throw exceptions.  As those tasks complete, they are placed onto a queue that can be accessed via the poll() or take() methods. This approach simplified my life:
  1. It allowed me to use tasks and task status to control logic flow instead of threads and thread status. 
  2. It freed me up from having to know how many threads or tasks I was working with.
  3. It provided me with an access point to each task that I could use to analyze the results.
  4. When the ExecutorCompletionService queue was empty, I knew it was time to then retry all failed results. 
Point (1) above is the foundation for the points that follow. When I used Future and Callable to implement the work I needed to do, I was able to return the results I cared about and process them. Specifically it allowed the code that ran the original key fetch loop to not worry about tracking and storing exceptions, which made for much simpler looping logic.

ExecutorCompletionService is a great example of how picking the right primitives makes it easy to compose highly functional helper objects. In this case the primitives involved were a (thread pool) executor and a (linked blocking) queue. (side note: I don't mean to sound like such a concurrent lib fanboy, but I'm really happy I didn't have to write this code myself, and really happy that the choice of primitives that the authors used enabled creation of classes like the ExecutorCompletionService. This stuff used to take significant effort on my part, and significant bugs were usually introduced :)


Here is an example of the ExecutorCompletionService in action, the relevant bits bolded and italicized.

public void buildEntireIndex() throws Exception {

        boolean moreKeys = true;
        int submittedJobCt = 0;
        // tracking all failed keys for later retry
        Map failedKeys = new HashMap();
        
        // ErrorCapturingExecutor is subclassed from ThreadPoolExecutor. 
        //I override ThreadPoolExecutor.afterExecution() to queue and later analyze exceptions.
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ErrorCapturingExecutor executor = new ErrorCapturingExecutor(threadCount, 
            threadCount, 0l, TimeUnit.SECONDS, linkedBlockingQueue);
        
        // CompletionService works with the supplied executor and queues up tasks as they finish.
        executorCompletionService = new ExecutorCompletionService(executor);
        
        
        String lastKey = null;
        
        while(moreKeys  == true) {    
            Set keys =  viewKeyRetriever.retrieveKeys(lastKey);
            
            if(keys.size() > 0) {
                String array[] = new String[keys.size()];
                keys.toArray(array); 
                lastKey = array[keys.size()-1];
                
                // I need to keep the number of waiting tasks bounded. 
                if(linkedBlockingQueue.size() > MAXQUEUESIZE) {
                    Thread.sleep(WAITTIME);
                }
                
                // this is where I actually submit tasks
                processKeys(keys);
                
                // I only know how many jobs I need to wait for when I've retrieved all keys.
                submittedJobCt++; 
                
            }
            else {
                moreKeys = false;
            }
            
        }

        
        // I use the ExecutorCompletionService queue to check on jobs as they complete. 
        for(int i = 0; i < submittedJobCt;i++) {
            
            Future finishedJob = executorCompletionService.take();
            
            // at this point, all I really care about is failures that I need to retry.
            BuildResults results = finishedJob.get();
            
            if(results.hasFailures()) {
                failedKeys.putAll(results.getFailedKeys());
            }
            
            indexBuilderMonitor.update(results);
        }
        
        // I can use failedKeys to retry processing on keys that have failed. 
        // Logic omitted for clarity.
        
        ...
        executor.shutdown();

    }


The ProcessKeys method is shown below: I broke it out because I needed to call it again when re-processing failed keys

private void processKeys(Set keys) {
        // the builder builds the index from the content retrieved using the passed in keys.
        final IndexBuilder builder = indexBuilderFactory.createBuilder(keys);
        
            
          executorCompletionService.submit(new Callable() {
          @Override  
          public BuildResults call() throws Exception {
              BuildResults buildResults = null; 
              try {
                  // buildResults contains all information I need to post process the task.
                  buildResults = builder.build();
              } catch (Exception e) {
                  throw e; // caught by ErrorCapturingExecutor
              }
            
              return buildResults;
          }

        
        });
        
    }

Wednesday, September 9, 2009

Using ThreadLocal to pass in dummy components

Synchronization of data access across multiple threads is tricky.  While Java's threading primitives are fairly easy to understand and use, there can be unintended performance consequences of making an object thread safe. Depending on how objects synchronize other objects, you can also end up with deadlocks that are no fun to debug. For example if object1 tries to lock object2 while object2 is trying to lock object1, you're in for a long night.

In general, anything that reduces synchronization of data across threads reduces the potential for unintended consequences. An alternative to making an object threadsafe is to make it thread-local. Thread local objects provide a separate copy of themselves for all threads. Each thread can only see it's local instance of that object, and is free to modify it at will, without needing to synchronize.

Thread-local variables used to have significant performance issues, and there have been bugs in previous to 1.6 versions. Also, it is possible to easily run out of memory with large numbers of threads using large thread-local objects. But assuming you go in with your eyes open, reducing synchronization across the threads in your application is good for performance and can significantly reduce complexity.

Another benefit of thread-local variables (as if simplification and performance gains aren't enough) is that they make it easy to swap in stub components at unit test time. Why would you do this instead of passing in the component? I ended up using thread-local variables for my components when I had to instantiate an object via the Class.forName() method, and didnt know/want to know about how to wire up dependent components. It's a trick I want to remember (so I'm writing it down :)

I implement the component as a thread-local variable is via an anonymous inner class:

ThreadLocal sequenceClientLocal = new ThreadLocal() {
        @Override
        protected SequenceClient initialValue() {
            SequenceClient sequenceClient =  null;
            try {
                sequenceClient = SequenceClientImpl.getInstance(hosts,hexIdNode);
            } catch (Exception e) {
                sequenceClient = null;
            }
            return sequenceClient;
        }
    };
    

In order to swap this default value out for a stub file, I add a setter to override it:

public void setSequenceClientLocal(ThreadLocal sequenceClientLocal) {
        this.sequenceClientLocal = sequenceClientLocal;
    }

At unit test time, I can stub in a dummy class by calling the setter:

public class TestCollapserAgent {

  @Before
  public void setUp() throws Exception {

    sequenceClient = new DummySequenceClientImpl(1);

    collapserAgent.setSequenceClientLocal(new ThreadLocal() {
            @Override
            protected SequenceClient initialValue() {
                return sequenceClient;
            }
        });
    ....
  }

  // unit tests follow....

}

Thursday, September 3, 2009

The Ratio of Ceremony vs Essence, aka Framework Debt

This is going to be the kind of post I had sworn off of: a lot of opinion, with some rambling mixed in. I apologize in advance for that, but occasionally I need to vent about things that deeply disturb me, and venting tends to be nonlinear.

I just spent the last week trying to work with a legacy system component that was implemented using the Spring framework. This component read data from a database into a Lucene index wrapped by Compass. At the time of implementation, the lead engineer was using JPA, to load database records into POJOs, which he then annotated so that they could be serialized via JAXB, which enabled Compass to read them in as Lucene Documents. Whew!

Because time was limited and the code was already in production, I decided to ignore my fundamental misgivings about frameworks and Java Acronyms, and make the minimal modifications to the existing source that would get it to take input from S3 instead of a database.

After a day of struggle, I had figured out what was going on, and was astounded by the amount of code required just to set up the relatively simple business logic. When I hit a 'schema not found' error trying to load the application.xml, I gave up, ripped out the business logic, and re-implemented the entire thing in a matter of hours. With a lot less code. I know that the original implementation of the Spring based code took a week or so to write.

The massive increase in efficiency is not because I'm a brilliant coder. I wish I was, but I've worked with brilliant coders and I'm not one of them. It's because the actual business logic was pretty minimal. The logic required to implement and maintain the Spring application required a lot of code that could only be described as Ceremonial, as opposed to Essential business logic. I first read about Ceremonial vs Essential code here, the night after I had exorcised Spring from the logic. The timing couldn't have been more appropriate.

What is Ceremonial code? It is code that has nothing to do with implementing a business requirement. In Spring, I define Ceremonial code as:
  1. Configuration as code
  2. Dependency Injection
  3. (Pedantic use of) Interfaces
The three examples above are not terribly bad, in fact they come from decent intentions ("the road to hell..."). But put together they have an exponentially bad effect. They are, when added to a developer's blind belief in the goodness of all things Frameworky, the Four Horsemen of the (Framework) Apocalypse.

Configuration As Code


Separating configuration into a data file is inherently a good idea. You don't want to hardcode variables that you would then have to rebuild the application to change. I'm not sure how this basically sound idea warped into "hey, let's put EVERYTHING into configuration", but the biggest problem with this approach is that now part of the logic is in code, the other part is in a massive XML file. You need both to understand the control flow of the application, so  you spend a lot of time toggling back and forth, saying "What class is being called? Oh, let me check in xml configuration. Oh, that's the class. Great. What was I doing?" Maybe some people see this kind of rapid mental stack management as interesting and novel brain training. I see it as wasting time, time that I could be spending either coding or testing a feature that someone is paying me to implement.

Dependency Injection


This too, starts off as a great idea. One of the big issues people (OK, I'm talking about myself, but using the word 'people' to get some more legitimacy) had with EJB 2.0 code was that it was really hard to test. You had to have the whole stack up and running, and validating the integrity of different components in the stack was so hard we just didnt do it.

Dependency Injection/Inversion of Control allows you to parameterize the components of an object, making that object really easy to test. Just expose components with getters and setters, and you can dummy them up and test that object in true isolation! Again, there is still nothing really flawed at this point.

The major flaw in Dependency Injection comes at implementation.  Objects need all of their components in a known, initialized state, in order to function effectively. Dependency Injection as implemented in Spring is usually done in the configuration file. Objects that are created in the configuration file  have all of their components set in their configuration.

It is very easy to miss setting a component in the configuration file. This means that the object will initialize in a bad state that becomes apparent when you try to use it. People use constructors because they can specify components as parameters to the constructor, which is an explicit way of saying "This component needs components X, Y, and Z to run".

Using a constructor provides a foolproof way to successfully initialize an object without having to test for initialization success. If the constructor returns, you're good. If not, you know that the object is not usable.

In order to be able to be configurable via Spring, objects must (a) have a default (no argument)  public constructor and expose all of their required components via setters. There is no way to enforce that setup has been correct, so the developer has to spend time looking at the getters and setters of the object to determine what components they need to supply at configuration time. When I compare that effort to the effort of looking at the constructor parameters, it feels very inefficient.

Pedantic Use of Interfaces


The goal of the Java Interface is (a) separate functionality from initialization, and (b) provide a contract that a caller and callee can communicate across. This makes sense in the following two cases:
  1. You have a complex object and you only want to expose a part of it to a caller. For example you have a parent class and you want to expose a callback interface to the child class.
  2. You have multiple implementations of the same functionality and you don't want the caller to care about which object they are calling. 
What I see all over Java-Land, and especially in Spring, is interfaces being used because someone got pedantic about separating functionality from initialization. I fail to see the use of an interface when used to abstract the implementation of all of the methods of a single class. You're writing two structures when one could do the job just fine. Actually, you end up writing three structures: the interface, the implementation, and a factory object, which is more ceremonial code. Even if you need the interface, you could still have the implementation object return an instance of itself cast to the interface via a static initialization method:

public class S3AccessorImpl implements S3Accessor {

    private static final int DEFAULT_SET_SIZE = 1000;
    private S3Service service;
    private Logger logger;

    public static S3Accessor getInstance(AWSCredentials creds) throws S3ServiceException {
        return new S3AccessorImpl(creds);
    }
    
    
    protected S3AccessorImpl(AWSCredentials creds) throws S3ServiceException {
        logger = Logger.getLogger(this.getClass());
        service = new RestS3Service(creds);

    }
    ...
}

In spite of my comments above, I am a fan of using interfaces as the boundaries between an components because it facilitates easier unit testing. But I'm not entirely sold on abstracting the creation of an object to a Factory that returns the interface that object implements -- not when the above method (a) hides creation from the caller and (b) doesn't require an extra class with a single 'createFoo' method.

Also, I don't understand always writing interfaces first, then implementation classes second. I tend to implement classes until I have a real need for an interface, i.e. during unit testing when I am going to submit a 'dummy' component in place of a real one. 

Conclusion


My recent experience with Spring has reminded me of the existence of 'Framework Debt'. Framework Debt is the Technical Debt required to implement a solution with a given Framework. In short it is determined by the ratio of time spent writing  and maintaining ceremonial code vs the amount of time spent writing and maintaining essential business code. The problem with most frameworks, Spring included, is that they do not distinguish between ceremonial and essential code, because to them, it's _all_ essential code. And, to work in that particular framework, ceremonial code is absolutely essential, and having to maintain and understand a bunch of logic that has nothing to do with application functionality seems inherently wrong to me.

I actually do like some frameworks I've run into. Rails is great because of it's 'convention over configuration', but that is another kind of technical debt. Fortunately it is pretty low in Rails, and as a result applications can be rapidly developed in Rails without losing maintainability. But even Rails feels too heavy for me at times.  I do write apps that don't need the overhead of MVC. For these apps, Sinatra allows me to quickly get path routing out of the way and concentrate on the underlying code.

Thursday, August 27, 2009

Zookeeper and Concurrency

Recently I ran into a problem that seemed to require more of a solution than I was willing to implement. We are currently migrating an application from using a single worker, single database to having multiple workers running in parallel using s3 as the primary means of storage. (Side note: this migration is only possible because the application doesn't actually require any of the unplanned, interactive queries that only a database is good at. The choice of a database as the persistence mechanism for this application was not a popular one, and only grew less popular as more time was spent putting out database related fires than implementing new features). One of the legacy requirements of the system, which supports an in production website, was that the IDs for all new items had to be distinct integers.

Without this legacy requirement I would have put in some kind of GUID scheme and called it an (early) day. The existence of previous items with IDs that other services relied upon made a GUID generation scheme not possible. However the requirement of distinct integers requires coordination between the agents, who would need to make sure they are not creating the same ID for different objects.

My initial thought was to implement a simple service that provided an integer that it would auto increment with every request. The big problem with this approach is that the service would be a massive, non redundant bottleneck unless it too was replicated, and then it would be faced with the same problem that the original workers faced wrt keeping integers synchronized across different processes.

So I had a bad feeling about starting down that path, and was putting it off, when a colleague at work suggested that I check out Zookeeper. Zookeeper, was created by Yahoo research specifically to solve the kind of synchronization problems that I was having, in a highly performant, fault tolerant way. In other words, this was the service that I was trying not to write :)

Zookeeper at 10000 feet consists of multiple services that maintain a hierarchical namespace consisting of nodes that can have child nodes. Each node can have associated data, limited to under 1MB, meant to be used for coordination/synchronization.

Zookeeper is in the words of its creators, "needed to be general enough to address our coordination needs and simple enough to implement a correct high performance service. We found that we were able to achieve {this} by trading strong synchronization for strong ordering guarantees and a wait-free interface."

What that means is that node access is 'no wait', meaning when you ask for data you get it, but you do not have an exclusive lock on the data. This is quite different than the mutex based locking model that I'm used to, and at first I didn't see how I could use this to guarantee unique IDs to multiple agents creating multiple items without getting an exclusive lock on the data and making a modification.

What I didn't get (until another colleague walked me through some of his code) is that any and all changes to the data are taken, and versioned. When I request data, I get back an object corresponding to the version of that data. When I submit data, I can specify that the submit will only succeed if the data (and therefore it's version) hasn't been updated from the version that I have. So if I get node data that is an integer ID, increment it, and try to update the data it back, two things can happen (excluding connection loss, which must also be dealt with):
  1. I can return successfully, meaning that my change was accepted because no one else had made changes since I had retrieved the data.
  2. I can get a Bad Version exception, which means I need to get the data again, and try to re-increment the new value.
The code below shows the method that requests, recieves, and attempts to increment the data:


public String getNextId() throws Exception {
ZooKeeper zk = null;
String hexId = null;

boolean keepGoing = true;

while(keepGoing == true) {
try {
Stat nodeStat = new Stat();
Stat setDataStat = null;
zk = getZooKeeper();
byte data[] = getDataWithRetries(zk,
sequenceName,
nodeStat);
ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);


try {
setDataStat = setDataWithRetries(
zk,sequenceName,
buf.array(),nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.BADVERSION)) {
nodeStat = setDataStat;
}

}
} finally {

// always need to close out the session!
zk.close();
}

}


return hexId;
}

I've wrapped calls to zookeeper with a getZookeeper() method, and pass the retrieved Zookeeper instance into two methods: getDataWithRetries(), and setDataWithRetries(). Both methods try to recover from connection losses as best they can.

The getDataWithRetries method takes a Zookeeper instance, the path to the node being accessed, and a Stat structure that will contain retrieved data version information. It returns the retrieved data in a byte array. Note how in this method I'm only going to recover from connection losses, because this is a read operation.


protected byte[] getDataWithRetries(
ZooKeeper zooKeeper,
String path,
Stat nodeStat) throws Exception{


byte data[] = null;

int i = 0;

while(i < RETRY_COUNT) {
try {
i++;
data = zooKeeper.getData(path,
false,
nodeStat);
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.NODEEXISTS))
{
break;
}
else {
throw e;
}
}
}
if(i >= RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return data;

}


Once I have the data, I increment it. Note that Zookeeper data is always kept as a byte array, so I convert it in order to increment it:

ByteBuffer buf = ByteBuffer.wrap(data);

long value = buf.getLong();

value++;

buf.rewind();

buf.putLong(value);

and then try to resubmit it back to Zookeeper. This is where things get interesting. If someone has modified the data before I could get back to it, I need to get the new value of the data and try again. In the setDataWithRetries() method below, I only handle connection exceptions, and blow out if there is a BADVERSION exception:



protected Stat setDataWithRetries(
ZooKeeper zooKeeper,
String path,
byte data[],
Stat stat) throws Exception{

int i = 0;
Stat statFromSet = null;

while(i < RETRY_COUNT)
try {
i++;
statFromSet = zooKeeper.setData(path,
data,
stat.getVersion());
break;
}
catch(KeeperException e) {
if(e.code().equals(Code.CONNECTIONLOSS))
{
continue;
}
else if(e.code().equals(Code.BADVERSION))
{
// differentiating for debug purposes
throw e;
}
else {
throw e;
}
}
}

if(i > RETRY_COUNT) {
throw new KeeperException.ConnectionLossException();
}

return statFromSet;

}

The calling code of setDataWithRetries() handles the BADVERSION exception by getting the data again, and retrying the submit:

try {
setDataStat = setDataWithRetries(zk,
sequenceName,
buf.array(),
nodeStat);
hexId = Long.toHexString(value);
break;
}
catch(KeeperException e) {
if(!e.code().equals(Code.BADVERSION))
{
throw e;
}
}


So each agent tries to get an ID until they succeed, at which point they know they've got a unique one.

The thing I really like about the strong versioning and ordering approach, now that I understand it, is that it acknowledges concurrency and makes it easy to deal with. Locking, on the other hand, seems like an attempt to abstract away the concurrency by enforcing serialization, which works OK when you are managing machine or process local resources, but can have huge performance impacts when you are trying to synchronize access across multiple machines.

The next thing I'm considering using Zookeeper for is configuration changes. Right now I push configuration changes out to my worker nodes by hand and force a restart via their web service interface. I would like them to be able to reload themselves automatically when state changes. This is a step up from the simple code detailed in this post, it means I need to use Zookeepers notification capabilities to alert listening processes when the configuration changes.

Tuesday, August 11, 2009

Using java.util.concurrent.CountDownLatch to synchronize startup/shutdown

I've been a big fan of the Java concurrency library since I stumbled upon it a while back. Before it came along, I was relegated to writing my own thread pools, schedulers, etc. Which meant, of course, that I was relegated to introducing lots of subtle and deviant bugs into code that had nothing to do with the actual product I was delivering.

The java.util.concurrent library freed me up to go ahead and focus on what I was really trying to deliver instead of re-inventing a hard to write wheel. Plus they're way smarter than me about concurrency. I highly recommend reading Java Concurrency In Practice, even if you dont code in Java, because the concurrency issues they discuss are universal, even if the solutions are in Java.

In the latest installment of 'how java.util.concurrent made me a happier, more productive developer', I was implementing a web service layer to control the run state of a set of worker threads. These workers needed to be started/stopped/paused/resumed/{insert favorite action here}. I don't want to continue processing on the calling thread (the web service start/stop/etc methods) until I am sure that the action requested by the caller has completed across all worker threads, which are running asynchronously.

My first thought was to write a pair of interfaces that allowed me to synchronize when an action was requested and when it was completed. In other words:

public interface Worker {
public void start(Master master);
public void stop(Master master);
public void pause(Master master);
..
};

public interface Master {
public void started(Worker worker);
public void stopped(Worker worker);
public void paused(Worker worker);
}

The problem with these interfaces and this design is that every time I needed to add an action to Worker, I needed to add a corresponding 'completed' message to Master. Also, the implementation of Master would need to track each worker against a worker pool, and scan that pool to see if an action was completed. Clearly way too much work to write, let alone understand 3 months later. Also, I knew that this was a pretty common problem, probably solved by the concurrency lib. So I cracked open the book....

java.util.concurrent.CountDownLatch is, in the words of the guys who wrote the library, "a synchronizer that can delay the progress of threads until it reaches it's terminal state". Hmm. Using the synchronizer frees me up from having to track the specific kind of state of N specific workers:

public interface Worker
public enum RunState {
STOPPING,
STOPPED,
STARTING,
STARTED,
PAUSING,
PAUSED,
RESTARTING
};
public boolean start(TaskCompleted listener) throws Exception
public void stop(TaskCompleted listener) throws Exception;
public void pause(TaskCompleted listener) throws Exception;
public void restart(TaskCompleted listener) throws Exception;
public void reload(TaskCompleted listener) throws Exception;
public RunState getState() throws Exception;
}

public interface TaskCompleted {
public void completed();
}


In the code above,
(1) I no longer care about which action is completed, or which worker completed the action, which means
(2) I no longer am keeping state for X workers in order to return from the call.
(3) The TaskCompleted interface can be implemented as an anonymous class in the response.

The CountDownLatch is pretty simple: it blocks until it's internal count reaches zero:
private void pauseWorkers() throws InterruptedException {
final CountDownLatch waitForPause = new CountDownLatch(workers.size());
for(WorkerBase worker : workers) {
worker.pause(new TaskCompleted() {
public void completed() {
waitForPause.countDown();
}
});
}

waitForPause.await();

// and now we're paused.

}

In the code above, the CountDownLatch is initialized to the number of workers I have. I iterate through the list of workers and perform the 'pause' action on them. Then I wait for the latch to get counted down to 0 before proceeding. I am keeping no state on the workers, I only care when they've completed their requested action. I suppose that for I could replace the anonymous implementation with an actual (dirt simple) implememtation that takes the counter and decrements it.

Thursday, July 16, 2009

Upgrading to Eclipse Galileo 3.5 from Ganymede 3.4 on Mac OSX

These are my notes on what I had to do to upgrade to Eclipse Galileo.

Why?
My main motivation was have 1.6 be my default JDK. With Ganymede I had to set my default JAVA_HOME env var to point to 1.5, and point 1.6 dependent apps -- like my command line mvn builds -- to the (non default) 1.6 JDK. That's exactly the kind of thing I forget 5 minutes after I do it.

What?

Just in case I need to do this again: as far as I could tell, upgrading to a major version of Eclipse currently requires a full, clean install. Which means no associated plugins. So I'm writing down the plugins I need to install, where to get them, etc.

Base Install

I installed the 32 bit Cocoa version of Galileo from http://www.eclipse.org/downloads/download.php?file=/technology/epp/downloads/release/galileo/R/eclipse-jee-galileo-macosx-cocoa.tar.gz

The diff between Cocoa and Carbon and 32 vs 64 bit is explained in detail here.

The tar file unloads to an eclipse directory: make sure you move your old version out of this dir if that's where you have it!

Plugin Installs

these were in order (Maven required Subversion)

(1) Subversion Plugin: I followed these instructions to install subclipse.

(2) Maven Plugin: I installed from http://m2eclipse.sonatype.org/update.

Follow Up

(1) I needed to change my JAVA_HOME environment var to point to my 1.6 install (I use soylatte).

(2) I needed to upgrade my subversion client to > 1.4 otherwise I saw an 'unsble to launch default SVN client' when trying to browse my SVN repo. I downloaded the latest svn client, restarted eclipse, and all was well.

Tuesday, June 30, 2009

Running Zookeeper on the Mac with Soylatte and Eclipse 3.4

I've been using Zookeeper to store a sequence number that a large number of processes can access and increment in a coordinated manner.

Zookeeper has a nice, simple interface, and exposes a set of primitives that easily allow me to implement guaranteed synchronized access to my magic sequence number. I'll post more later on the specific solution, but right now I want to detail some of the issues I've run into and the workarounds I've put in place.

I run on a Mac (OSX/Leopard), use Eclipse 3.4 for my Java development, and use soylatte for my JDK. I think a lot of other people run with this setup. I'm using Zookeeper 3.1.3.

My initial setup steps:
  1. I downloaded Zookeeper, untarred it, and installed in /usr/local.
  2. I created a symlink from zookeeper-3.1.1 to zookeeper
  3. From that dir I ran sudo ./bin/zkServer start.

I immediately ran into a strange issue: I could connect to the zookeeper instance:

ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",ZookeeperBase.DEFAULT_TIMEOUT,this);

but could not create a node on it:

zk.create("/HELLO", foo, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

I kept getting timeouts. I've written my code to be 'timeout proof' because connection loss errors are to be expected under load in distributed environments, but I do kick out after 5 retries. Besides, I wouldn't expect to get the ConnectionLoss error when I am connecting to a localhost instance.

It turns out that the there have been soylatte nio issues with Zookeeper. I talked to Satish (he's on the email thread in the link, and we both work at Evri), and he said he had success using the latest version of 1.6 that mac 'officially' supports.

I switched to the latest Apple supported java 1.6 version: when I pointed my java binaries at 1.6, Zookeeper worked great, but Eclipse couldn't restart -- some more online research showed that this was another known issue.

So in the end: I
(1) created a java16 symlink to/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java
(2) used that symlink in zkServer.sh
(3) kept my $JAVA_HOME pointing to 1.5 by symlinking/System/Library/Frameworks/JavaVM.framework/Versions/1.5 to/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK

Saturday, June 27, 2009

Seattle Rock n Roll Marathon Race Report

Well, the big event has come and gone. And I'm happy to have crossed the finish line under my own power! It was a great first marathon experience. To summarize, after not being able to run very much in the last six weeks due to 'life happening' and some late breaking bronchitis, I decided to run the marathon with no particular time goal in mind. However, if I could come close to my original goal of sub 4 hours, that would be gravy :)

I ended up getting 4:10 (according to my watch, which I stopped during a short but necessary bathroom break). The race was going according to sub 4 pace and plan until mile 21, when massive calf cramps in both legs dramatically slowed me down. Cramping of any kind is frustrating, because you have to basically shut it down completely. I wasn't even breathing hard, but I simply could not move any faster. I was able to fight the cramps off for a while, but by mile 25 they were pretty constant, and spreading from my calves to my groin and quads. In the last half mile I was cramping with every step, but I was basically in a tunnel of people at that point, and they cheered me on through the cramps. While it wasn't really 'fun' at the time, I'll never forget struggling down the finishing stretch and the support the crowd gave me that got me through it.

The highlight of my race was seeing my friends Lori and Anthony at the 18.5 mile mark (when I was still feeling pretty good), with their 'Run Arun' sign. Lori was training for the marathon when she injured her knee. In her place I would have been pouting and eating bon-bons on the couch, but she and Anthony came out and cheered us all on -- making the killer sign, giving me pretzels, gatorade and hugs (which must have been pretty gross since I was drenched in sweat). Thanks guys, seeing you both after the long grind uphill was exactly what I needed!

For a first marathon, the Seattle Rock n Roll was perfect. The weather was great, the bands were great (especially for a guy that doesn't train with an iPod), the water stations were perfectly placed, and the course was challenging, with the biggest hill coming on at mile 15-18, running up the false flat of highway 99. For me, the toughest part was running past the turnoff towards the finish on mile 23, and knowing I still had 3 long, cramp filled miles to go. That was mental.

My cramping was probably due to the lack of recent training. I had trained for a specific pace, and kept that pace for the first 21 miles. But the time off caught up with me in the end, and my target pace was probably too fast for my current level of fitness. I guess I'll have to claim a 'moral' victory. And there will always be next year!


Tuesday, June 23, 2009

Streaming Data with a Worker/Agent based approach

Where I was going....
In my last post I described how at work, we were investigating using Hadoop in a non batch setting. I mentioned that despite not using Hadoop's ability to collate keyed data from large data sets, we were still investigating Hadoop because of the built in robustness of the system:
  • Nodes are checked via 'heartbeat'
  • task status is centrally tracked
  • failed tasks are retried.
  • Work is pulled from the central JobTracker by TaskTrackers.
The basic pain points of maintaining highly available and robust functionality across a cluster of machines is taken care of, and was the primary motivator for us to try and stream data across a batch driven system.

However as we moved into implementation it became fairly obvious that we were pounding a square peg into a round hole. A lot has been written about how Hadoop and HDFS doesn't work particularly well with small files -- the recommended solutions usually involve concatenating those files into something bigger to reduce the number of seeks per map job. While these problems were understandable in a system optimized to process huge amounts of data in batch, waiting to batch up large files wasn't an option given the low latency requirement of our end users.

Especially disconcerting was the amount of work (and code) spent bundling queued work items into small files, and submitting those files as individual jobs. The standard worker model --having multiple processes with multiple threads per process running on multiple machines access SQS and process the data -- seemed so much simpler than creating artificial batches.

A Swift Change of Direction
The rewrite took a matter of hours, dropped out a lot of code, and was a minor change to the overall architecture, which uses SQS to transition between workflow states, and S3 to persist the results of data transformations. The move away from Hadoop was limited to intermediate worker processes -- we still use Hadoop to get the data into the system, because we are collating data across a set of keys when importing data. The latency went from somewhat indeterminate across mini batches to being the average time to process per thread. And the workers were easily subclassed from the Callable class -- developers could implement new workers by overriding a single method that took a string as input. When latency of the system went up, simply adding more machines running more processes would take care of the problem.

Distributed Availability and Retry Logic
Of course, that simplicity came with a price tag -- we lost the distributed bookkeeping that Hadoop provided. Specifically, we would have to implement:
  1. thread and process failure detection
  2. machine failure detection
  3. retry logic
All of which is non trivial to implement. However, our need to stream instead of batch data meant that we would have ended up having to do the retry logic differently than Hadoop anyways. We need to catch and retry data failures at a work item level, not at an arbitrarily determined file split level.

Our retry logic is pretty simple, and uses S3 to persist workflow state per work item. We traverse a list of items in the queue, determine which ones have 'stalled out', and submit them to the appropriate queue as part of a retry. At the same time we clean up work items that have been fully processed, and get average processing time per workflow process. These three things are best done in an asynchronous manner, as -- you guessed it -- Hadoop jobs. They need to take advantage of Hadoop's collation functionality.

Our thread failure logic is also pretty simple. Because I'm starting up Callable tasks and making them run until I shut them down, I can check to see if any of them have finished prematurely by calling isDone() on the Futures returned when submitting them to the ExecutorService.

Process failure can be monitored (and logged) by a watchdog program. Repeated process failure in this case is symptomatic of an uncaught exception being thrown in one of the process threads.

Machine failure is also easily monitorable. I need to expose a simple service on each machine to detect process and thread failures, and if that process is not reachable, I can assume that the machine is offline.

These may be fairly limited and crude methods of getting a highly available system in place, but they feel like the right primitives to implement because while I don't know why the system is going to fail, each of these methods gives me a way to know how it is failing.

The Conclusion (so far)
The morals of the story at this point are:
  1. frameworks can be extremely powerful if used for their strengths, and extremely limiting if used for their secondary benefits. When it feels like I'm pounding a square peg into a round hole, I probably am. I think this is called 'design smell', and now that I know what it smells like, I'll start backing up a lot sooner in an effort to find the right tool for the job.
  2. It is always a good sign when a refactoring drops out lots of code.
  3. Having to implement the availability and robustness of the system we are writing has actually made it easier to understand. Even though we are implementing functionality that we once got for free, at least we understand the limitations of the availability and robustness solutions we put in place.

Monday, June 8, 2009

Streaming Data with Hadoop and SQS -- a work in progress

Hadoop is by design a batch oriented system. Take a bunch of data, run a series of maps and reduces with it across X machines, and come back when it's done. A Hadoop cluster has high throughput and high latency. In other words, it takes a while, but a lot of stuff gets done.

At work, I'm leading a team that is implementing a data processing pipeline. The primary use case that we need to address involves quickly handling changes in the entity data we care about -- people, places, and things that act or act upon other people, places and things. Specifically, this means:
  • detect that there has been a change
  • get the changed data into the system
  • apply a series of transforms to it for downline rendering and searching systems
This use case requires a streaming solution. Each piece of data needs to be transformed and pushed into production relatively quickly. Note that there is an acceptable latency. If, for instance, if Famous Actor X dies, our system needs to detect it and update the data within the hour. However, detecting/updating data within a day would be too slow to be useful to someone wanting to know what was up with Famous Actor X.

At first glance, Hadoop is not a good fit for this solution. It takes file based inputs and produces file based outputs, so any individual piece of data moving though the system is limited by the speed at which an entire input set can be run through a cluster.

However, Hadoop has the following features that make it ideal for distributing work.
  1. The MapReduce abstraction is a very powerful one that can be applied to many different kinds of data transformation problems.
  2. The primary Mapper And Reducer interfaces are simple enough to allow many different developers to ramp up on the system in minimal time.
  3. HDFS allows the developer to not worry about where they put job data.
  4. Cluster setup and maintenance, thanks to companies like Cloudera (who fixed the issues I was seeing with S3, thanks!), is taken care of.
  5. The logic around distributing the work is completely partitioned away from the business logic that actually comprises the work.
  6. Jobs that fail are re-attempted.
  7. I'm sure there's more..
As our system scales, we can only assume that the number of concurrent inputs and therefore the system load will grow. If we were to take the lowest initial effort route and write our own multithreaded scheduler, we would have a much more straightforward solution.
However, as the workload grows to swamp a single machine, we would eventually end up having to deal with the headaches of distributed computing -- protocol, redundancy, failover/retry, synchronization, etc.

In fact, I was fully intending to write a quick scheduler app to stream data and throw it away when we reached a scale that required distribution, at which point I was going to use Hadoop. However I soon realized that I would be solving problems -- scheduling, data access, retry logic -- and those were just the initial non distributed issues -- that were already addressed by Hadoop.

Still, there is the high latency/inherent batchiness of Hadoop. In order to work around the high latency problem, we're trying to enable streaming between MapReduces via SQS. The input of the entity data into the system and the various transforms of that data can be treated as a series of MapReduce clusters, where the transformation is done during the map, and any necessary collation is done during the reduce. The Reduce phase of each MapReduce can send a notification for the piece of data it has to the next MapReduce cluster.

Of course, it is really inefficient to run MapReduce over a single piece of data at a time. So the listener on each SQS queue buckets incoming messages using a max messages/max time threshold to aggregate data. When a threshold is reached, the system then writes the collected messages to a file that it then starts a MapReduce job on. This is mini batching, and as long as it delivers the data within the specified system latency, it's all good.

What we are doing is definitely a work in progress. The reason why is that there are several 'dials' in this process that need to be tuned and tweaked based on the size of the input data.
  1. number of messages/amount of time to wait. Of course, the larger the input files, the more 'efficient' the system. On the other hand, making those files larger may imply an increase in latency.
  2. number of concurrent jobs -- there is a sweet spot here -- it (again) doesn't make sense to launch 100 concurrent jobs. We will need to pay attention to how long a job takes before deciding to adjust the number of concurrent jobs up or down.
  3. number of transformations -- the bucketing required implies that every transform has a built in latency, that factors into the overall latency.
  4. cluster size -- it makes no sense to run 20 nodes where 2 would suffice, but there will be times when 20 is necessary
Some of the dials may be adjustable on the fly -- sending messages to a cluster via an 'admin' queue allows us to change message size/max time/concurrent job numbers dyamically. Other dials may require a stop-reconfig-start of a cluster. One benefit of using SQS is that no messages are lost while we tweak these 'hard stop' dials.

We're not doing a classical MapReduce where we transform data and then collate it under a distinct key. We're doing simple transformations with the data. The reduce piece isn't really needed, because there is no collation to be done. Plenty of people use MapReduce this way, primarily because it allows them to easily decompose their logic into parallel processing units.

We are even going further by undermining one of the key strengths of MapReduce, the ability to operate on large chunks of data, and instead running lots of smaller jobs concurrently. The Hadoop framework makes this possible. I'm not sure how optimal this is, and expect that we will be tweaking the message size and concurrency dials for some time. One of the advantages that the underlying Hadoop framework offers us is flexibility, as evidenced by the kinds of dials we can tweak to get optimal system throughput and latency.

I'll keep updating as we learn more about how the system behaves, and what the 'right' settings of the dials are based on the amount of data being input. I don't know if this is necessarily the most correct or elegant way to build a pipeline, but I do know that it is a good sign the Hadoop framework is this flexible.

Monday, June 1, 2009

logrotate and scripts that just cant let go.

I just found out that a logrotate job I had configured X months ago wasn't working when the lead came up to me and said 'what the $#!k is a 5GB file doing in my /var/log?' He was pissed because this was the second time logrotate had not been configured correctly (both times, my fault). The first time, we discovered logrotate.conf cannot have comments in it. This time, it looked like logrotate had run, but the script had kept the filehandle to the old log file open and was continuing to log to the rotated file.


One way to do this is send a HUP to the process in the postrotate script of the logrotate. This would mean I had to modify the script to trap the HUP signal, release the filehandle, get a handle to the new (same name) log file, and keep rolling on. I decided not to do this because it involved modifying the original script and I didn't have that much time to relearn things.

The second way I ended up doing this was to kill and restart the process during postrotate. Here is the config file in /etc/logrotate.d

/var/log/response_process.log {
daily
missingok
rotate 52
compress
delaycompress
notifempty
create 640 root adm
sharedscripts
postrotate
kill $(ps ax | grep process.rb | grep -v 'grep' | awk '{print $1}')
ruby process.rb -logfile:/var/log/response_process.log > /dev/null 2>&1
endscript
}

kind of hacky, but I didn't have to expend any mental effort making sure that the filehandle was truly closed in the script.