Archives for January 2013

Adventures in Clustering – part 1

Last year I added clustering support to a system I had previously developed for a client. The requirements were to implement automated failover to eliminate a single point of failure and to distribute certain kinds of work among members of the cluster.

The application I was changing is a message delivery system, but my client has other applications with the same needs so the solution needed to be general enough to apply elsewhere.

Automating Failover with Leader Election

The message delivery system runs a number of scheduled jobs. For example, one of the scheduled jobs checks the database to see which new messages need to be delivered, delivers them, and updates message statuses after the delivery is complete. The message delivery system runs on JVMs on multiple machines for availability, but only one JVM should run the scheduled job, otherwise duplicate messages will be sent.

Before clustering support was added one of the JVMs was designated as the Delivery Server using a Java system property and only the Delivery Server ran the scheduled jobs. If the Delivery Server went down, one of the other JVMs had to be manually converted into the Delivery Server. This was suboptimal because it depended on humans to be available to both notice the problem and perform the failover, and the failover process was error-prone.

There are a number of ways to solve the the problem with the specific use case I just described. But there were other use cases where manual failover was also the problem, both in the message delivery system and in other applications the client has. I didn’t want to have a use case-specific solution for each problem.

To solve the problem generally I decided to use Leader Election. With Leader Election cluster members decide among themselves which member is the leader and the leader is responsible for certain tasks. The message delivery system already had the concept of a leader – the Delivery Server. I just needed to automate the process of choosing that leader.

The ClusterService

To support the Leader Election and work distribution features, I introduced the concept of a cluster service. When the service is initialized it starts the leader election process for that cluster member. At any time it can be queried to see if the current node is the leader, and who the other members of the cluster are. Here is the ClusterService interface:

In ClusterStatus, current, leader and participants are node IDs.

Scheduled Jobs

Previously only the Delivery Server ran scheduled jobs.  With Leader Election all of the cluster members run the scheduled jobs, but those jobs were changed to return immediately if the current cluster member is not the leader.   For example:

The distributeEvents job runs every 30 seconds on all cluster members. It gets the cluster status from the ClusterService and if the current node is the leader it calls distribute() to do the actual work.

Work Distribution

In the message delivery system the ClusterSystem is also used for work distribution. The leader distributes certain pieces of work to all of the nodes in the cluster. The ClusterSystem is queried for the cluster members. The cluster member node ID is mapped to a remote Akka actor. For example:

I will cover the work distribution system in detail in a separate post.

Zookeeper and Curator

I wanted to leverage a third-party clustering system as developing core clustering functionality is non-trivial. My first choice was Akka Cluster but when I was adding clustering support to the message delivery system Akka Cluster had not been released (it was just a high level design doc at that time). Zookeeper, on the other hand, had been on the scene for for a while. The Zookeeper client has a reputation of being difficult to work with so I decided to use Curator, a library that abstracts over the Zookeeper Java client.


ZookeeperService is the Curator-based implementation of the ClusterService:

ZookeeperService also has an embedded Zookeeper server which will be covered in part 2.

Leader Election using Curator

Curator has two ways of doing Leader Election: the LeaderSelector and LeaderLatch.

I first implemented leader election using LeaderSelector but I was unsatisfied with how complicated the resulting code was. After some discussion with Jordan Zimmerman, Curator’s developer, I reimplemented leader election using LeaderLatch. Here’s what it looks like:

The call to watchLeaderChildren() is optional. It’s only needed if you want to be notified when the leader changes or if a cluster member falls out of the cluster. In the message delivery system that wasn’t strictly necessary because it always checks who the leader is before doing something that only the leader should do. But it’s a nice thing to have for monitoring purposes:

In watchLeaderChildren a watch is set on the children of the LeaderLatch znode. Each child represents a cluster member and the first znode in the list of children is the leader.  If the set of children changes the watch is fired and the process() method is called. In process() the cluster status is queried and the watch is set again (Zookeeper watches are disabled after they fire).

Cluster Status using Curator

ZookeeperService.clusterStatus looks like:

It is a straightforward query of the LeaderLatch to populate the ClusterStatus object.

Application Startup

ClusterService.initialize is called at application startup if ClusterService.enabled is true. Here is the ZookeeperService implementation:

CuratorFramework is the high level Curator API.  It wraps a Curator Client which wraps the Zookeeper client. After the CuratorFramework is created the startup code blocks until the client has connected to the server and is ready to start working. Then selectLeader() is called to start the leader election process. Once the leader election process is started, Curator handles everything else under the hood (for example if the leader dies, a new node joins the cluster, or if one of the Zookeeper servers goes down).

Coming Up

In the next posts in this series I will embed the Zookeeper server into the application, handle a corner case when leadership changes while the leader is working, discuss work distribution in detail and I will talk about a port of the clustering functionality to Akka Cluster. Stay tuned!