In other situations requiring synchronization between threads, I've used a CountDownLatch. This works really well when you know the exact number of threads that you need to synchronize. You initialize the latch with the number of threads that you are synchronizing. When they finish work they decrement the latch, when the latch count goes to 0 you continue processing.
This time was different because instead of synchronizing threads, I was trying to halt processing until all asynchronous submitted tasks had completed processing. I was queuing up several hundred thousand tasks into a thread pool, and did not know when that thread pool would be finished with the work, or how many threads would be running when the entire job completed, or even exactly how many tasks I had to run -- that number depended on the number of documents being fetched, which is always growing.
Fortunately my situation was not a unique one. I figured that the first place to look was the java concurrency library, and when I did some research, I found that ExecutorCompletionService was exactly what I needed.
ExecutorCompletionService works with a supplied Executor using Runnable/Callable tasks. I decided to use Callable tasks, as they allowed me to return and inspect a value, and throw exceptions. As those tasks complete, they are placed onto a queue that can be accessed via the poll() or take() methods. This approach simplified my life:
- It allowed me to use tasks and task status to control logic flow instead of threads and thread status.
- It freed me up from having to know how many threads or tasks I was working with.
- It provided me with an access point to each task that I could use to analyze the results.
- When the ExecutorCompletionService queue was empty, I knew it was time to then retry all failed results.
ExecutorCompletionService is a great example of how picking the right primitives makes it easy to compose highly functional helper objects. In this case the primitives involved were a (thread pool) executor and a (linked blocking) queue. (side note: I don't mean to sound like such a concurrent lib fanboy, but I'm really happy I didn't have to write this code myself, and really happy that the choice of primitives that the authors used enabled creation of classes like the ExecutorCompletionService. This stuff used to take significant effort on my part, and significant bugs were usually introduced :)
Here is an example of the ExecutorCompletionService in action, the relevant bits bolded and italicized.
public void buildEntireIndex() throws Exception { boolean moreKeys = true; int submittedJobCt = 0; // tracking all failed keys for later retry MapfailedKeys = new HashMap (); // ErrorCapturingExecutor is subclassed from ThreadPoolExecutor. //I override ThreadPoolExecutor.afterExecution() to queue and later analyze exceptions. LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue (); ErrorCapturingExecutor executor = new ErrorCapturingExecutor(threadCount, threadCount, 0l, TimeUnit.SECONDS, linkedBlockingQueue); // CompletionService works with the supplied executor and queues up tasks as they finish. executorCompletionService = new ExecutorCompletionService (executor); String lastKey = null; while(moreKeys == true) { Setkeys = viewKeyRetriever.retrieveKeys(lastKey); if(keys.size() > 0) { String array[] = new String[keys.size()]; keys.toArray(array); lastKey = array[keys.size()-1]; // I need to keep the number of waiting tasks bounded. if(linkedBlockingQueue.size() > MAXQUEUESIZE) { Thread.sleep(WAITTIME); } // this is where I actually submit tasks processKeys(keys); // I only know how many jobs I need to wait for when I've retrieved all keys. submittedJobCt++; } else { moreKeys = false; } } // I use the ExecutorCompletionService queue to check on jobs as they complete. for(int i = 0; i < submittedJobCt;i++) { Future finishedJob = executorCompletionService.take(); // at this point, all I really care about is failures that I need to retry. BuildResults results = finishedJob.get(); if(results.hasFailures()) { failedKeys.putAll(results.getFailedKeys()); } indexBuilderMonitor.update(results); } // I can use failedKeys to retry processing on keys that have failed. // Logic omitted for clarity. ... executor.shutdown(); }
The ProcessKeys method is shown below: I broke it out because I needed to call it again when re-processing failed keys
private void processKeys(Setkeys) { // the builder builds the index from the content retrieved using the passed in keys. final IndexBuilder builder = indexBuilderFactory.createBuilder(keys); executorCompletionService.submit(new Callable () { @Override public BuildResults call() throws Exception { BuildResults buildResults = null; try { // buildResults contains all information I need to post process the task. buildResults = builder.build(); } catch (Exception e) { throw e; // caught by ErrorCapturingExecutor } return buildResults; } }); }