Distributed Systems links

pg-distrib-logoA few years ago, while researching Zookeeper for a project I was working on, I realized that there was a whole field Computer Science, Distributed Systems, that I was totally unfamiliar with. That started a journey of discovery that’s been very rewarding.   In response to a question on the Akka mailing list I put together a list of links to Distributed Systems resources.  I’ve been meaning to translate that email to a blog post for a while.

To start off I would definitely recommend checking out a talk that Jonas Boner from Typesafe gave at Strange Loop called The Road to Akka Cluster and Beyond (slides).

Implementation-oriented books that I would recommend for developers are:

These are all filled with practical advice for building real-world distributed systems.

One thing I found is that there is a big gap between academic and industry knowledge right now.  This is discussed in a post on Henry Robinson’s excellent Paper Trail blog where he provides a guide to digging deeper both on the academic side and by reading research papers written by industry leaders like Google, Yahoo, etc.   Definitely read the links in the “First Steps” section.  The gap is also the topic of a post on Marc Brooker’s blog and a post on Murat’s blog.  Besides papers he links to some other good people to follow like Aphyr and Peter Bailis.  Two blogs that review Distributed Systems papers are the Morning Paper and MetaData.  I also recommend following Brave New Geek, Ben Stopford and Kellabyte, and the Hacking, Distributed, High Scalability and Highly Scalable blogs.

Papers We Love is a collective of meetups across the world where people present their takes on research papers that they find fascinating.  Their web site has videos taken at these meetups.  They also hold yearly conferences.

YouTubers are also getting into the act – for example, Vivek Haldar has a series of videos called Read a Paper where he summarizes papers in around ten minutes.

Many times the conferences where the papers are presented also publish videos, slide decks and posters that are much easier to consume for a working developer.  If you have a paper that you are really interested in be sure and and check out the web site of the conference where the paper was published.  Usenix in particular is really good at this.  In addition in the last few years a number of research projects have been creating web sites to promote the research where you can find code, videos and more.  For example, check out the site for Hermes, a replication protocol.

Working to fill the gap between academia and industry:

Essential ACM Queue articles

Notable blog posts

Other reading lists

Online Courses

I recommend getting familiar with the CAP Theorem.  You’re going to run into it all over the place.

Zookeeper is a Consensus (or Coordination) system.  Consensus is a major topic in theoretical and practical distributed systems and is what got me started digging into distributed systems originally.  To start getting familiar with Consensus I recommend:

On the academic textbook side, I have these on my stack to read:

This is just the tip of the iceberg.  Besides consensus, other distributed systems topics that I’ve found interesting include distributed databases, group membership, gossip protocols (used in Akka, Cassandra and Consul), time and clocks, and peer-to-peer systems.

Adventures in Clustering – part 2

Embedding a Zookeeper Server

To minimize the number of moving parts in the message delivery system I wanted to embed the Zookeeper server in the application, rather than running a separate ensemble of Zookeeper servers.

The embedded Zookeeper server is encapsulated in an EmbeddedZookeeper trait that is mixed into the ZookeeperService class from part 1.   Here is EmbeddedZookeeper:

There are two kinds of Zookeeper servers, standalone and replicated.  Standalone (aka Single) servers, which are typically used for local development and testing, use ZookeeperServerMain to start up. Replicated (aka Clustered or Multi-server) servers, which are replicated for high availability, use QuorumPeerMain.  I extend both of these classes to add a stop() method in a common interface.

A couple other things to mention about EmbeddedZookeeper: it is self-typed to ClusterService and Logging to get access to the nodeId and log methods.  Also there is an abstract clientPort method that is implemented by ZookeeperService.

ZookeeperService Initialization

The embedded server is configured and started as part of the initialize() method in ZookeeperService:

In initialize() a list of Zookeeper server hostnames is passed to genServerNames() which generates a ServerNames (see below).   Then the Zookeeper server is configured and started.

Server Configuration

ServerNames contains hosts, a collection of cluster node IDs and serversKey and serversVal, which corresponds to the server.x=[hostname]:nnnnn[:nnnnn] configuration settings for clustered servers (each server needs to know about all of the other servers).  If there are multiple elements in ServerNames.hosts then useReplicated will be true which will tell configureServer() to configure a replicated server.

In configureServer() the first thing that happens is the Zookeeper server data directory is removed and recreated. I found that the Zookeeper server data directory could get corrupted if the server wasn’t shut down cleanly. Instead of trying to maintain the data directory across restarts I decided to just recreate the data directory each time the application started up. The downside is that the server’s database has to be populated on each startup (it synchronizes the data from other servers). In this particular use case it’s ok because there is very little data being stored in Zookeeper (just the Leader Election znode and children). The upside is that no manual intervention is needed to address corrupt data directories.

Replicated Zookeeper servers require a myid file that identifies the ordinal position of the server in the ensemble. To avoid having to create this file manually I create it here as part of server startup.

Finally the appropriate Zookeeper server object is instantiated and configured with a set of Properties and a server startup function is returned for use by startServer(). A reference to the server object is saved in zkServer for shutdown later.

Server Startup

To start the server a new thread is created and in that thread the server startup function returned by configureServer() is run. The main application thread waits on a semaphore for the server to start.


One of the limitations of running Zookeeper in replicated mode is that a quorum of servers has to be running in order for the ensemble to respond to requests. A quorum is N/2 + 1 of the servers. In practice this means that there should be an odd number of servers and there needs to be at least three servers in the cluster. If there are three servers in the cluster then two of the servers have to be up and running for the clustering functionality to work. Depending on how you deploy your application it might not be possible to always keep at least N/2 + 1 servers running. If that is the case then embedded Zookeeper won’t be an option for you.

Also, one of the things that I noticed when running embedded Zookeeper is that during deployment as application JVMs are bounced you will get a lot of error noise in the logs coming from Zookeeper server classes. This is expected behavior but it might cause concern with your operations team.

Coming Up

In the next post I will come back to the ClusterService and fix a problem where multiple nodes might think they are the leader at the same time.

Adventures in Clustering – part 1

Last year I added clustering support to a system I had previously developed for a client. The requirements were to implement automated failover to eliminate a single point of failure and to distribute certain kinds of work among members of the cluster.

The application I was changing is a message delivery system, but my client has other applications with the same needs so the solution needed to be general enough to apply elsewhere.

Automating Failover with Leader Election

The message delivery system runs a number of scheduled jobs. For example, one of the scheduled jobs checks the database to see which new messages need to be delivered, delivers them, and updates message statuses after the delivery is complete. The message delivery system runs on JVMs on multiple machines for availability, but only one JVM should run the scheduled job, otherwise duplicate messages will be sent.

Before clustering support was added one of the JVMs was designated as the Delivery Server using a Java system property and only the Delivery Server ran the scheduled jobs. If the Delivery Server went down, one of the other JVMs had to be manually converted into the Delivery Server. This was suboptimal because it depended on humans to be available to both notice the problem and perform the failover, and the failover process was error-prone.

There are a number of ways to solve the the problem with the specific use case I just described. But there were other use cases where manual failover was also the problem, both in the message delivery system and in other applications the client has. I didn’t want to have a use case-specific solution for each problem.

To solve the problem generally I decided to use Leader Election. With Leader Election cluster members decide among themselves which member is the leader and the leader is responsible for certain tasks. The message delivery system already had the concept of a leader – the Delivery Server. I just needed to automate the process of choosing that leader.

The ClusterService

To support the Leader Election and work distribution features, I introduced the concept of a cluster service. When the service is initialized it starts the leader election process for that cluster member. At any time it can be queried to see if the current node is the leader, and who the other members of the cluster are. Here is the ClusterService interface:

In ClusterStatus, current, leader and participants are node IDs.

Scheduled Jobs

Previously only the Delivery Server ran scheduled jobs.  With Leader Election all of the cluster members run the scheduled jobs, but those jobs were changed to return immediately if the current cluster member is not the leader.   For example:

The distributeEvents job runs every 30 seconds on all cluster members. It gets the cluster status from the ClusterService and if the current node is the leader it calls distribute() to do the actual work.

Work Distribution

In the message delivery system the ClusterSystem is also used for work distribution. The leader distributes certain pieces of work to all of the nodes in the cluster. The ClusterSystem is queried for the cluster members. The cluster member node ID is mapped to a remote Akka actor. For example:

I will cover the work distribution system in detail in a separate post.

Zookeeper and Curator

I wanted to leverage a third-party clustering system as developing core clustering functionality is non-trivial. My first choice was Akka Cluster but when I was adding clustering support to the message delivery system Akka Cluster had not been released (it was just a high level design doc at that time). Zookeeper, on the other hand, had been on the scene for for a while. The Zookeeper client has a reputation of being difficult to work with so I decided to use Curator, a library that abstracts over the Zookeeper Java client.


ZookeeperService is the Curator-based implementation of the ClusterService:

ZookeeperService also has an embedded Zookeeper server which will be covered in part 2.

Leader Election using Curator

Curator has two ways of doing Leader Election: the LeaderSelector and LeaderLatch.

I first implemented leader election using LeaderSelector but I was unsatisfied with how complicated the resulting code was. After some discussion with Jordan Zimmerman, Curator’s developer, I reimplemented leader election using LeaderLatch. Here’s what it looks like:

The call to watchLeaderChildren() is optional. It’s only needed if you want to be notified when the leader changes or if a cluster member falls out of the cluster. In the message delivery system that wasn’t strictly necessary because it always checks who the leader is before doing something that only the leader should do. But it’s a nice thing to have for monitoring purposes:

In watchLeaderChildren a watch is set on the children of the LeaderLatch znode. Each child represents a cluster member and the first znode in the list of children is the leader.  If the set of children changes the watch is fired and the process() method is called. In process() the cluster status is queried and the watch is set again (Zookeeper watches are disabled after they fire).

Cluster Status using Curator

ZookeeperService.clusterStatus looks like:

It is a straightforward query of the LeaderLatch to populate the ClusterStatus object.

Application Startup

ClusterService.initialize is called at application startup if ClusterService.enabled is true. Here is the ZookeeperService implementation:

CuratorFramework is the high level Curator API.  It wraps a Curator Client which wraps the Zookeeper client. After the CuratorFramework is created the startup code blocks until the client has connected to the server and is ready to start working. Then selectLeader() is called to start the leader election process. Once the leader election process is started, Curator handles everything else under the hood (for example if the leader dies, a new node joins the cluster, or if one of the Zookeeper servers goes down).

Coming Up

In the next posts in this series I will embed the Zookeeper server into the application, handle a corner case when leadership changes while the leader is working, discuss work distribution in detail and I will talk about a port of the clustering functionality to Akka Cluster. Stay tuned!