Thursday, September 10, 2009

Using ExecutorCompletionService to synchronize multithreaded workflow

Today I ran into a problem where I needed to make sure that one multithreaded phase of processing had completely ended before starting another. Specifically, I was retrieving documents from S3 to load into a lucene index, and wanted to retry all document requests that had failed due to S3 flakiness, connectivity issues, i.e. standard distributed computing error conditions.

In other situations requiring synchronization between threads, I've used a CountDownLatch. This works really well when you know the exact number of threads that you need to synchronize. You initialize the latch with the number of threads that you are synchronizing. When they finish work they decrement the latch, when the latch count goes to 0 you continue processing.

This time was different because instead of synchronizing threads,  I was trying to halt processing until all asynchronous submitted tasks had completed processing. I was queuing up several hundred thousand tasks into a thread pool, and did not know when that thread pool would be finished with the work, or how many threads would be running when the entire job completed, or even exactly how many tasks I had to run -- that number  depended on the number of documents being fetched, which is always growing.

Fortunately my situation was not a unique one.  I figured that the first place to look was the java concurrency library, and when I did some research, I found that ExecutorCompletionService was exactly what I needed.

ExecutorCompletionService works with a supplied Executor using Runnable/Callable tasks. I decided to use Callable tasks, as they allowed me to return and inspect a value, and throw exceptions.  As those tasks complete, they are placed onto a queue that can be accessed via the poll() or take() methods. This approach simplified my life:
  1. It allowed me to use tasks and task status to control logic flow instead of threads and thread status. 
  2. It freed me up from having to know how many threads or tasks I was working with.
  3. It provided me with an access point to each task that I could use to analyze the results.
  4. When the ExecutorCompletionService queue was empty, I knew it was time to then retry all failed results. 
Point (1) above is the foundation for the points that follow. When I used Future and Callable to implement the work I needed to do, I was able to return the results I cared about and process them. Specifically it allowed the code that ran the original key fetch loop to not worry about tracking and storing exceptions, which made for much simpler looping logic.

ExecutorCompletionService is a great example of how picking the right primitives makes it easy to compose highly functional helper objects. In this case the primitives involved were a (thread pool) executor and a (linked blocking) queue. (side note: I don't mean to sound like such a concurrent lib fanboy, but I'm really happy I didn't have to write this code myself, and really happy that the choice of primitives that the authors used enabled creation of classes like the ExecutorCompletionService. This stuff used to take significant effort on my part, and significant bugs were usually introduced :)


Here is an example of the ExecutorCompletionService in action, the relevant bits bolded and italicized.

public void buildEntireIndex() throws Exception {

        boolean moreKeys = true;
        int submittedJobCt = 0;
        // tracking all failed keys for later retry
        Map failedKeys = new HashMap();
        
        // ErrorCapturingExecutor is subclassed from ThreadPoolExecutor. 
        //I override ThreadPoolExecutor.afterExecution() to queue and later analyze exceptions.
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ErrorCapturingExecutor executor = new ErrorCapturingExecutor(threadCount, 
            threadCount, 0l, TimeUnit.SECONDS, linkedBlockingQueue);
        
        // CompletionService works with the supplied executor and queues up tasks as they finish.
        executorCompletionService = new ExecutorCompletionService(executor);
        
        
        String lastKey = null;
        
        while(moreKeys  == true) {    
            Set keys =  viewKeyRetriever.retrieveKeys(lastKey);
            
            if(keys.size() > 0) {
                String array[] = new String[keys.size()];
                keys.toArray(array); 
                lastKey = array[keys.size()-1];
                
                // I need to keep the number of waiting tasks bounded. 
                if(linkedBlockingQueue.size() > MAXQUEUESIZE) {
                    Thread.sleep(WAITTIME);
                }
                
                // this is where I actually submit tasks
                processKeys(keys);
                
                // I only know how many jobs I need to wait for when I've retrieved all keys.
                submittedJobCt++; 
                
            }
            else {
                moreKeys = false;
            }
            
        }

        
        // I use the ExecutorCompletionService queue to check on jobs as they complete. 
        for(int i = 0; i < submittedJobCt;i++) {
            
            Future finishedJob = executorCompletionService.take();
            
            // at this point, all I really care about is failures that I need to retry.
            BuildResults results = finishedJob.get();
            
            if(results.hasFailures()) {
                failedKeys.putAll(results.getFailedKeys());
            }
            
            indexBuilderMonitor.update(results);
        }
        
        // I can use failedKeys to retry processing on keys that have failed. 
        // Logic omitted for clarity.
        
        ...
        executor.shutdown();

    }


The ProcessKeys method is shown below: I broke it out because I needed to call it again when re-processing failed keys

private void processKeys(Set keys) {
        // the builder builds the index from the content retrieved using the passed in keys.
        final IndexBuilder builder = indexBuilderFactory.createBuilder(keys);
        
            
          executorCompletionService.submit(new Callable() {
          @Override  
          public BuildResults call() throws Exception {
              BuildResults buildResults = null; 
              try {
                  // buildResults contains all information I need to post process the task.
                  buildResults = builder.build();
              } catch (Exception e) {
                  throw e; // caught by ErrorCapturingExecutor
              }
            
              return buildResults;
          }

        
        });
        
    }

3 comments:

  1. The System.Threading Namespace provides a number of types that enable multi-threading programming. In addition to providing types that represent a particular thread, this namespace also describe types that can handle a collection of threads, a simple Timer class and several types to provide synchronized access to shared data.

    ReplyDelete
  2. This comment has been removed by a blog administrator.

    ReplyDelete