Tuesday, August 11, 2009

Using java.util.concurrent.CountDownLatch to synchronize startup/shutdown

I've been a big fan of the Java concurrency library since I stumbled upon it a while back. Before it came along, I was relegated to writing my own thread pools, schedulers, etc. Which meant, of course, that I was relegated to introducing lots of subtle and deviant bugs into code that had nothing to do with the actual product I was delivering.

The java.util.concurrent library freed me up to go ahead and focus on what I was really trying to deliver instead of re-inventing a hard to write wheel. Plus they're way smarter than me about concurrency. I highly recommend reading Java Concurrency In Practice, even if you dont code in Java, because the concurrency issues they discuss are universal, even if the solutions are in Java.

In the latest installment of 'how java.util.concurrent made me a happier, more productive developer', I was implementing a web service layer to control the run state of a set of worker threads. These workers needed to be started/stopped/paused/resumed/{insert favorite action here}. I don't want to continue processing on the calling thread (the web service start/stop/etc methods) until I am sure that the action requested by the caller has completed across all worker threads, which are running asynchronously.

My first thought was to write a pair of interfaces that allowed me to synchronize when an action was requested and when it was completed. In other words:

public interface Worker {
public void start(Master master);
public void stop(Master master);
public void pause(Master master);
..
};

public interface Master {
public void started(Worker worker);
public void stopped(Worker worker);
public void paused(Worker worker);
}

The problem with these interfaces and this design is that every time I needed to add an action to Worker, I needed to add a corresponding 'completed' message to Master. Also, the implementation of Master would need to track each worker against a worker pool, and scan that pool to see if an action was completed. Clearly way too much work to write, let alone understand 3 months later. Also, I knew that this was a pretty common problem, probably solved by the concurrency lib. So I cracked open the book....

java.util.concurrent.CountDownLatch is, in the words of the guys who wrote the library, "a synchronizer that can delay the progress of threads until it reaches it's terminal state". Hmm. Using the synchronizer frees me up from having to track the specific kind of state of N specific workers:

public interface Worker
public enum RunState {
STOPPING,
STOPPED,
STARTING,
STARTED,
PAUSING,
PAUSED,
RESTARTING
};
public boolean start(TaskCompleted listener) throws Exception
public void stop(TaskCompleted listener) throws Exception;
public void pause(TaskCompleted listener) throws Exception;
public void restart(TaskCompleted listener) throws Exception;
public void reload(TaskCompleted listener) throws Exception;
public RunState getState() throws Exception;
}

public interface TaskCompleted {
public void completed();
}


In the code above,
(1) I no longer care about which action is completed, or which worker completed the action, which means
(2) I no longer am keeping state for X workers in order to return from the call.
(3) The TaskCompleted interface can be implemented as an anonymous class in the response.

The CountDownLatch is pretty simple: it blocks until it's internal count reaches zero:
private void pauseWorkers() throws InterruptedException {
final CountDownLatch waitForPause = new CountDownLatch(workers.size());
for(WorkerBase worker : workers) {
worker.pause(new TaskCompleted() {
public void completed() {
waitForPause.countDown();
}
});
}

waitForPause.await();

// and now we're paused.

}

In the code above, the CountDownLatch is initialized to the number of workers I have. I iterate through the list of workers and perform the 'pause' action on them. Then I wait for the latch to get counted down to 0 before proceeding. I am keeping no state on the workers, I only care when they've completed their requested action. I suppose that for I could replace the anonymous implementation with an actual (dirt simple) implememtation that takes the counter and decrements it.

2 comments:

  1. The cost now of Synchronization is the cost when a lock is contested - that is, if Thread-1 tries to get a lock that Thread-2 already has then Thread-1 has to wait for Thread-2 to release it. So there has been work done recently to reduce the need to exclusively lock other Threads from blocks of code. Things like a fixed volatile functionality, concurrent collection implementations inside the java.util.concurrent package, and more flexible locks inside the java.util.concurrent.lock package.

    ReplyDelete
  2. This comment has been removed by a blog administrator.

    ReplyDelete