Adventures in Clustering – part 2

Embedding a Zookeeper Server

To minimize the number of moving parts in the message delivery system I wanted to embed the Zookeeper server in the application, rather than running a separate ensemble of Zookeeper servers.

The embedded Zookeeper server is encapsulated in an EmbeddedZookeeper trait that is mixed into the ZookeeperService class from part 1.   Here is EmbeddedZookeeper:

There are two kinds of Zookeeper servers, standalone and replicated.  Standalone (aka Single) servers, which are typically used for local development and testing, use ZookeeperServerMain to start up. Replicated (aka Clustered or Multi-server) servers, which are replicated for high availability, use QuorumPeerMain.  I extend both of these classes to add a stop() method in a common interface.

A couple other things to mention about EmbeddedZookeeper: it is self-typed to ClusterService and Logging to get access to the nodeId and log methods.  Also there is an abstract clientPort method that is implemented by ZookeeperService.

ZookeeperService Initialization

The embedded server is configured and started as part of the initialize() method in ZookeeperService:

In initialize() a list of Zookeeper server hostnames is passed to genServerNames() which generates a ServerNames (see below).   Then the Zookeeper server is configured and started.

Server Configuration

ServerNames contains hosts, a collection of cluster node IDs and serversKey and serversVal, which corresponds to the server.x=[hostname]:nnnnn[:nnnnn] configuration settings for clustered servers (each server needs to know about all of the other servers).  If there are multiple elements in ServerNames.hosts then useReplicated will be true which will tell configureServer() to configure a replicated server.

In configureServer() the first thing that happens is the Zookeeper server data directory is removed and recreated. I found that the Zookeeper server data directory could get corrupted if the server wasn’t shut down cleanly. Instead of trying to maintain the data directory across restarts I decided to just recreate the data directory each time the application started up. The downside is that the server’s database has to be populated on each startup (it synchronizes the data from other servers). In this particular use case it’s ok because there is very little data being stored in Zookeeper (just the Leader Election znode and children). The upside is that no manual intervention is needed to address corrupt data directories.

Replicated Zookeeper servers require a myid file that identifies the ordinal position of the server in the ensemble. To avoid having to create this file manually I create it here as part of server startup.

Finally the appropriate Zookeeper server object is instantiated and configured with a set of Properties and a server startup function is returned for use by startServer(). A reference to the server object is saved in zkServer for shutdown later.

Server Startup

To start the server a new thread is created and in that thread the server startup function returned by configureServer() is run. The main application thread waits on a semaphore for the server to start.

Notes

One of the limitations of running Zookeeper in replicated mode is that a quorum of servers has to be running in order for the ensemble to respond to requests. A quorum is N/2 + 1 of the servers. In practice this means that there should be an odd number of servers and there needs to be at least three servers in the cluster. If there are three servers in the cluster then two of the servers have to be up and running for the clustering functionality to work. Depending on how you deploy your application it might not be possible to always keep at least N/2 + 1 servers running. If that is the case then embedded Zookeeper won’t be an option for you.

Also, one of the things that I noticed when running embedded Zookeeper is that during deployment as application JVMs are bounced you will get a lot of error noise in the logs coming from Zookeeper server classes. This is expected behavior but it might cause concern with your operations team.

Coming Up

In the next post I will come back to the ClusterService and fix a problem where multiple nodes might think they are the leader at the same time.

Adventures in Clustering – part 1

Last year I added clustering support to a system I had previously developed for a client. The requirements were to implement automated failover to eliminate a single point of failure and to distribute certain kinds of work among members of the cluster.

The application I was changing is a message delivery system, but my client has other applications with the same needs so the solution needed to be general enough to apply elsewhere.

Automating Failover with Leader Election

The message delivery system runs a number of scheduled jobs. For example, one of the scheduled jobs checks the database to see which new messages need to be delivered, delivers them, and updates message statuses after the delivery is complete. The message delivery system runs on JVMs on multiple machines for availability, but only one JVM should run the scheduled job, otherwise duplicate messages will be sent.

Before clustering support was added one of the JVMs was designated as the Delivery Server using a Java system property and only the Delivery Server ran the scheduled jobs. If the Delivery Server went down, one of the other JVMs had to be manually converted into the Delivery Server. This was suboptimal because it depended on humans to be available to both notice the problem and perform the failover, and the failover process was error-prone.

There are a number of ways to solve the the problem with the specific use case I just described. But there were other use cases where manual failover was also the problem, both in the message delivery system and in other applications the client has. I didn’t want to have a use case-specific solution for each problem.

To solve the problem generally I decided to use Leader Election. With Leader Election cluster members decide among themselves which member is the leader and the leader is responsible for certain tasks. The message delivery system already had the concept of a leader – the Delivery Server. I just needed to automate the process of choosing that leader.

The ClusterService

To support the Leader Election and work distribution features, I introduced the concept of a cluster service. When the service is initialized it starts the leader election process for that cluster member. At any time it can be queried to see if the current node is the leader, and who the other members of the cluster are. Here is the ClusterService interface:

In ClusterStatus, current, leader and participants are node IDs.

Scheduled Jobs

Previously only the Delivery Server ran scheduled jobs.  With Leader Election all of the cluster members run the scheduled jobs, but those jobs were changed to return immediately if the current cluster member is not the leader.   For example:

The distributeEvents job runs every 30 seconds on all cluster members. It gets the cluster status from the ClusterService and if the current node is the leader it calls distribute() to do the actual work.

Work Distribution

In the message delivery system the ClusterSystem is also used for work distribution. The leader distributes certain pieces of work to all of the nodes in the cluster. The ClusterSystem is queried for the cluster members. The cluster member node ID is mapped to a remote Akka actor. For example:

I will cover the work distribution system in detail in a separate post.

Zookeeper and Curator

I wanted to leverage a third-party clustering system as developing core clustering functionality is non-trivial. My first choice was Akka Cluster but when I was adding clustering support to the message delivery system Akka Cluster had not been released (it was just a high level design doc at that time). Zookeeper, on the other hand, had been on the scene for for a while. The Zookeeper client has a reputation of being difficult to work with so I decided to use Curator, a library that abstracts over the Zookeeper Java client.

ZookeeperService

ZookeeperService is the Curator-based implementation of the ClusterService:

ZookeeperService also has an embedded Zookeeper server which will be covered in part 2.

Leader Election using Curator

Curator has two ways of doing Leader Election: the LeaderSelector and LeaderLatch.

I first implemented leader election using LeaderSelector but I was unsatisfied with how complicated the resulting code was. After some discussion with Jordan Zimmerman, Curator’s developer, I reimplemented leader election using LeaderLatch. Here’s what it looks like:

The call to watchLeaderChildren() is optional. It’s only needed if you want to be notified when the leader changes or if a cluster member falls out of the cluster. In the message delivery system that wasn’t strictly necessary because it always checks who the leader is before doing something that only the leader should do. But it’s a nice thing to have for monitoring purposes:

In watchLeaderChildren a watch is set on the children of the LeaderLatch znode. Each child represents a cluster member and the first znode in the list of children is the leader.  If the set of children changes the watch is fired and the process() method is called. In process() the cluster status is queried and the watch is set again (Zookeeper watches are disabled after they fire).

Cluster Status using Curator

ZookeeperService.clusterStatus looks like:

It is a straightforward query of the LeaderLatch to populate the ClusterStatus object.

Application Startup

ClusterService.initialize is called at application startup if ClusterService.enabled is true. Here is the ZookeeperService implementation:

CuratorFramework is the high level Curator API.  It wraps a Curator Client which wraps the Zookeeper client. After the CuratorFramework is created the startup code blocks until the client has connected to the server and is ready to start working. Then selectLeader() is called to start the leader election process. Once the leader election process is started, Curator handles everything else under the hood (for example if the leader dies, a new node joins the cluster, or if one of the Zookeeper servers goes down).

Coming Up

In the next posts in this series I will embed the Zookeeper server into the application, handle a corner case when leadership changes while the leader is working, discuss work distribution in detail and I will talk about a port of the clustering functionality to Akka Cluster. Stay tuned!

An Auto-Updating Caching System – part 2

In the previous post we imagined that we needed to build a caching system in front of a slow backend system. The cache needed to meet the following requirements:

  • The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
  • Requests to the backend system need to be throttled.

Akka actors looked like a good fit for the requirements. Each actor would handle a query for the backend system and cache the results. In part 1 I talked about the CachingSystem which created CacheActors and provided helper methods for working with the caches. In this post I will cover the CacheActor class hierarchy.

Here is the base CacheActor class:

abstract class CacheActor[V](cacheSystem: CacheSystem) extends Actor with Logging {
  implicit val execContext: ExecutionContext = context.dispatcher

  def findValueReceive: Receive = {
    case FindValue(params) => findValueForSender(params, sender)
  }

  def findValueForSender(params: Params, sender: ActorRef) {
    val key = params.cacheKey
    val elem = cache.get(key)

    if (elem != null) {
      sender ! elem.getObjectValue.asInstanceOf[V]
    } else {
      Future { findObject(params) }.onComplete {
        case Right(result) => result match {
          case Some(value) => sender ! value
          case None => sender ! Status.Failure(new Exception("findObject returned None for key=" + key + " cache=" + cache.getName))
        }
        case Left(ex) => sender ! Status.Failure(ex)
      }
    }
  }

  def findObject(params: Params): Option[V] = {
    cacheSystem.findObjectForCache(params.cacheKey, cache, 
                                   finder(params))
  }

  // Provided by subclasses
  val cache: Cache
  def finder(params: Params): () => V
}

object CacheActor {
  case class FindValue(params: Params)

  trait Params {
    def cacheKey: String
  }
}

Part 1 showed an example of a CachingBusinessService sending a FindValue message to a Service1CacheActor using the ? (ask) method.  findValueReceive handles FindValue by either returning a value from the cache or making a call to the backend (via CacheSystem.findObjectForCache) to get the value.

Concrete CacheActors are responsible for implementing finder which returns a function to query the backend system. The returned function is ultimately executed by CacheSystem.findObjectForCache.

Part 1 also showed CacheSystem sending UpdateCacheForNow messages to periodically update cache values. UpdateCacheForNow is handled by a subclass of CacheActorDateCacheActor:

abstract class DateCacheActor[V](cacheSystem: CacheSystem) 
    extends CacheActor[V](cacheSystem) {

  override def receive = findValueReceive orElse  {
    case UpdateCacheForNow => updateCacheForNow()

    case UpdateCacheForPreviousBusinessDay => updateCacheForPreviousBusinessDay()
  }

  def updateCacheForNow() {
    val activeBusinessDay: Range[Date] = DateUtil.calcActiveBusinessDay
    val start = activeBusinessDay.getStart
    val now = new Date

    // If today is a business day and now is within the business day, 
    // retrieve data from the backend and put in the cache
    if (now.getTime >= start.getTime && 
        now.getTime <= activeBusinessDay.getEnd.getTime)
      updateCacheForDate(now)
    } 
  }

  def updateCacheForPreviousBusinessDay() {
    updateCacheForDate(DateUtil.calcActiveBusinessDay.getStart)
  }

  def updateCacheForDate(date: Date) {
    import DateCacheActor._    // Use separate thread pool
    Future { findObject(new DateParams(date)) }
  }
}

object DateCacheActor {
  // Update cache for the current time
  case object UpdateCacheForNow  

  // Update cache for previous business day   
  case object UpdateCacheForPreviousBusinessDay

  // updateCacheForDate() uses a separate thread pool to prevent scheduled tasks 
  // from interfering with user requests
  val FUTURE_POOL_SIZE = 5
  val FUTURE_QUEUE_SIZE = 20000

  private lazy val ucfdThreadPoolExecutor = 
    new ThreadPoolExecutor(FUTURE_POOL_SIZE, FUTURE_POOL_SIZE, 1, TimeUnit.MINUTES, 
                           new ArrayBlockingQueue(FUTURE_QUEUE_SIZE, true))
  implicit lazy val ucfdExecutionContext: ExecutionContext = 
    ExecutionContext.fromExecutor(ucfdThreadPoolExecutor)
}

During non-business hours values UpdateCacheForNow messages are ignored and values from the previous business day are returned from the cache.  If the app is started during non-business hours an UpdateCacheForPreviousBusinessDay message is scheduled to populate cache values for the previous business day.

A separate thread pool is used to perform the backend system queries for the scheduled UpdateCacheFor* tasks.   We don’t want them to interfere with user requests which are handled using the regular actor thread pool.

Here is what a concrete DateCacheActor would look like, using the Service1CacheActor from part 1 as an example:

class Service1CacheActor(val cache: Cache, cacheSystem: CacheSystem, 
                         bizService: BusinessService) 
    extends DateCacheActor[JList[Service1Result]](cacheSystem) {

  override def receive = super.receive

  override def updateCacheForDate(date: Date) {
    import DateCacheActor._
    Future { findObject(new Service1Params(date, true)) }
    Future { findObject(new Service1Params(date, false)) }
  }

  def finder(params: Params) = { () =>
    params match {
      case p: Service1Params => bizService.service1(p.date, p.useFoo)
      case _ => throw new IllegalArgumentException("...") 
    }
  }
}

class Service1Params(date: Date, val useFoo: Boolean) extends DateParams(date) {
  override def cacheKey = super.cacheKey + ":" + useFoo
}

Service1CacheActor‘s implementation of updateCacheForDate finds and caches the results of the true and false variations of the BusinessService.service1 backend system call.

If we wanted to cache another one of BusinessService‘s methods using the auto-updating caching system we would:

  1. Subclass DateCacheActor, implement finder and potentially override updateCacheForDate.
  2. Subclass DateParams, providing the parameters to the backend query, and override the cacheKey method.
  3. Call createCacheActor again in CachingBusinessService to create the new DateCacheActor from #1, and write a cached version of the backend query method, sending FindValue to the new actor and waiting for the response.

An Auto-Updating Caching System – part 1

Imagine you needed to build a caching system in front of a slow backend system with the following requirements:

  • The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
  • Requests to the backend system need to be throttled.

Here’s a possible solution taking advantage of Akka actors and Scala’s support for functions as first class objects.   The first piece of the puzzle is a CacheSystem which creates and queries EhCache caches:

class CacheSystem(name: String, updateIntervalMin: Int, cacheManager: CacheManager) {
  var caches = List.empty[Cache]
  val actorSystem = ActorSystem("cache_" + name)

  val DEFAULT_TTL_SEC = 86400   // 1 day

  def addCache(name: String, size: Int, ttlSeconds: Int = DEFAULT_TTL_SEC): Cache = {
    val config = new CacheConfiguration(name, size)
    config.setTimeToIdleSeconds(ttlSeconds)
    val cache = new Cache(config)
    cacheManager.addCache(cache)
    caches = cache :: caches
    cache
  }

  def createCacheActor(cacheName: String, cacheSize: Int, scheduleDelay: Duration, 
                       actorCreator: (Cache, CacheSystem) => Actor, 
                       ttlSeconds: Int = DEFAULT_TTL_SEC): ActorRef = {

    val cache = addCache(cacheName, cacheSize, ttlSeconds)
    val actor = actorSystem.actorOf(Props(actorCreator(cache, this)), 
                                    name = cacheName + "CacheActor")

    actorSystem.scheduler.schedule(scheduleDelay, updateIntervalMin minutes, 
                                   actor, UpdateCacheForNow)   
    if (!DateUtil.isNowInActiveBusinessDay) {
      actorSystem.scheduler.scheduleOnce(scheduleDelay, actor, 
                                         UpdateCacheForPreviousBusinessDay)
    }

    actor
  }

  def findCachedObject[T](key: String, cache: Cache, finder: () => T): Option[T] = {
    val element = cache.get(key)

    if (element == null) {
      findObjectForCache(key, cache, finder)
    } else {
      Some(element.getObjectValue.asInstanceOf[T])
    }
  }

  def findObjectForCache[T](key: String, cache: Cache, finder: () => T): Option[T] = {
    val domainObj = finder()
    if (domainObj != null) {
      val element = new Element(key, domainObj)
      cache.put(element)
      Some(domainObj)
    } else {
      None
    }
  }

  def clearAllCaches() {
    caches.foreach(_.removeAll())
  }
}

The createCacheActor method creates a cache and an actor, and schedules tasks to periodically update the cache. I decided to use actors for this because the actor system’s thread pool is a good way to meet the throttling requirement. In addition using the Akka API it is easy to have scheduled tasks send messages to actors.  createCacheActor takes a function of  (Cache, CacheSystem) => Actor to create the actor.   It then fills in those parameters to create the actor using the Akka actorOf method.

The findCachedObject and findObjectForCache methods take a finder function of () => T that handles the lookup of objects from the backend system.

Here is an example of the CacheSystem being used by the business logic layer:

class CachingBusinessService(bizService: BusinessService) extends BusinessService {
  implicit val timeout = Timeout(60 seconds)

  val service1CacheActor = 
    cacheSystem.createCacheActor("service1", DATE_CACHE_SIZE, 0 seconds, 
                                 new Service1CacheActor(_, _, bizService))
  // ... more actors created here

  def service1(date: Date, useFoo: Boolean): JList[Service1Result] = {
    val future = 
      service1CacheActor ? FindValue(new Service1Params(date, useFoo))
    Await.result(future, timeout.duration).asInstanceOf[JList[Service1Result]]
  }

  // ... more service methods
}

The CachingBusinessService is a caching implementation of the BusinessService interface. It creates CacheActors to service the requests.   To create the Service1CacheActor it passes a curried constructor to createCacheActor.

The caching implementation of service1 sends a FindValue message to the service1CacheActor, using the ? (ask) method which returns an Akka Future.  Then it waits for the result of the future and returns it to the caller.

Using Await.result should raise a red flag. You don’t want to block if you don’t have to (epsecially inside of an actor). However in this case the BusinessService is being called as part of a REST API served by a non-async HTTP server. Before the caching layer was introduced it would block waiting for the back end to to respond.

Here’s the code for the FindValue message and the Params that it contains.  Params are the parameters for the backend query. Each unique Params object corresponds to a cache entry so each Params subclass is responsible for generating the appropriate cache key.

object CacheActor {
  case class FindValue(params: Params)

  trait Params {
    def cacheKey: String
  }
}

In the next post I’ll describe the CacheActor class hierarchy.

Using Groovy Closures as Scala Functions

I have a Scala trait for persistence and transaction management (which I will blog about in more detail later). The trait looks like:

trait DomainManager {
  def get[E](id: Long)(implicit m: ClassManifest[E]): Option[E]

  def find[E](namedQuery: String, params: Map[String, Any] = null): Option[E]

  //...more methods

  def withTransaction[R](f: (TransactionStatus) => R,
            readOnly: Boolean = false,
            propagationBehavior: PropagationBehavior = PropagationRequired): R
}

Let’s take a look at withTransaction() specifically. It is called like:

val result = domainManager.withTransaction { txStatus =>
  // Access database here
}

If your application is written in Scala or Java it is sometimes handy to have certain pieces of it written in Groovy, to be able to easily change a class and reload it without restarting the application. Your Groovy code will be able to call any of your Java classes, but what about your Scala classes? For example, what if we want to call withTransaction() on DomainManager? How do we deal with the f parameter? And what about the default parameter values?

Groovy and Scala both have the concept of functions as first class objects. In Groovy they are called closures and they are implemented by the class groovy.lang.Closure. In Scala they are called functions and they are implemented by the traits scala.Function0, scala.Function1, ... scala.FunctionN, where N is the number of parameters to the function.

In withTransaction() the type of f is Function1[TransactionStatus, R].  It is a function that takes one parameter of type TransactionStatus and returns a generic R.   

Through Groovy magic closures can be coerced to arbitrary interfaces. For example, I wrapped the withTransaction() method in Groovy like this:

    def withTransaction(Closure closure) {
        domainManager.withTransaction(closure as Function1<TransactionStatus, Object>,
             false, PropagationRequired)
    }

Here the closure parameter is coerced (using the Groovy as coercion operator) to a Scala Function1[TransactionStatus, _] which is the proper type after the generic parameter R is erased.

You cannot use Scala’s default parameter values in Groovy or Java so the last two parameters (readOnly and propagationBehavior) need to be passed explicitly.

Now I can call my Groovy withTransaction() with a closure like:

  def result = withTransaction { txStatus ->
    // Access database here
  }

The same technique can be used to call Scala collection methods like List.map() with Groovy closures.

Templating XML data with Velocity

Velocity is an easy-to-use templating system for the JVM. It’s commonly used to code templates for web pages and email. To use Velocity you pass it a template (a string) and a context, which is a map of Javabeans and collections of Javabeans. The template is coded using the Velocity template language. Here is an example of a template (taken from the Velocity User Guide):

Hello $customer.Name!
<table>
#foreach( $mud in $mudsOnSpecial )
   #if ( $customer.hasPurchased($mud) )
      <tr>
        <td>
          $flogger.promo( $mud )
        </td>
      </tr>
   #end
#end
</table>

What if some of your data is not in Javabean form, but is free form XML (free form meaning you don’t have any control over what the structure is going to be)?

Static languages like Scala and Java are pretty limited for dealing with free form XML. You can parse it into a DOM-like tree or parse it using a SAX-like streaming parser. Then to make the data available to Velocity you would write a Velocity-compatible adapter for the chosen XML API.

Groovy has really nice XML handling capabilities. You can parse XML and then use the results using regular Groovy code, not ugly DOM walking. For example, given this XML:

    <records>
      <car name='HSV Maloo' make='Holden' year='2006>
        <country>Australia</country>
        <record type='speed'>Production Pickup Truck with speed of 271kph</record>
      </car>
      <car name='P50' make='Peel' year='1962'>
        <country>Isle of Man</country>
        <record type='size'>Smallest Street-Legal Car at 99cm wide and 59 kg</record>
      </car>
      <car name='Royale' make='Bugatti' year='1931'>
        <country>France</country>
        <record type='price'>Most Valuable Car at $15 million</record>
      </car>
    </records>

You can parse and use it like this in Groovy:

def records = new XmlSlurper().parseText(xml)

def allRecords = records.car
assert 3 == allRecords.size()
def allNodes = records.depthFirst().collect{ it }
assert 10 == allNodes.size()
def firstRecord = records.car[0]
assert 'car' == firstRecord.name()
assert 'Holden' == firstRecord.@make.text()
assert 'Australia' == firstRecord.country.text()
def carsWith_e_InMake = 
  records.car.findAll{ it.@make.text().contains('e') }

Using the Groovy API you can write a Velocity adapter for free form XML that exposes most of the power of the native Groovy language features. The Groovy API is just another set of classes that you can use in any JVM application. You can use the API without using the Groovy language itself.

Here is the adapter code in Scala. It wraps Groovy GNodes and GNodeLists with objects that are compatible with Velocity:

import groovy.util.{XmlParser, Node => GNode, NodeList => GNodeList}

object GNodeWrapper
  def xmlToGNode(xml: String) = 
    GNodeWrapper(new XmlParser().parseText(xml))

  def wrapGNodes(n: Any): AnyRef = n match {
    case list: GNodeList => GNodeListWrapper(list)
    case node: GNode => GNodeWrapper(node)
    case x @ _ => x.asInstanceOf[AnyRef]
  }
}
import GNodeWrapper._

case class GNodeWrapper(node: GNode) {
  def get(key: String) = {
    val gnode = node.get(key)
    gnode match {
      case list: GNodeList if list.size == 1 =>
        val n = list.get(0).asInstanceOf[GNode]
        if (n.children.size == 1) {
          n.children.get(0) match {
            case _: GNode => wrapGNodes(n)
            case x @ _ => x
          }
        } else {
          wrapGNodes(n)
        }
      case x @ _ => wrapGNodes(x)
    }
  }

  override def toString: String = node.text
}

case class GNodeListWrapper(nodeList: GNodeList) {
  def get(key: String) = wrapGNodes(nodeList.getAt(key))

  def get(index: Int) = wrapGNodes(nodeList.get(index))

  def size = nodeList.size

  def isEmpty = nodeList.isEmpty

  def iterator = GNodeListIterator(nodeList.iterator)

  override def toString: String = {
    if (nodeList.size == 0) ""
    else
      nodeList.get(0) match {
        case node: GNode => node.text
        case x @ _ => x.toString
      }
  }
}

case class GNodeListIterator(iter: java.util.Iterator[_]) 
      extends java.util.Iterator[AnyRef] {
  def hasNext = iter.hasNext

  def next = wrapGNodes(iter.next)

  def remove() = iter.remove()
}

That’s not much code, especially compared to what the Java/DOM equivalent would be.

Using the adapter looks like:

  // xml is a string containing the sample XML from above
  val contextData =
      Map("title" -> "test title",
          "content" -> "test content",
          "meta" -> GNodeWrapper.xmlToGNode(xml))

  // Renders template and contextData to stringWriter
  velocityEngine.evaluate(new VelocityContext(contextData.asJava), 
                            stringWriter, "example", template)
  

So a template that looks like this:

title=$title
content=$content
country=$meta.records.car[0].country
year=$meta.records.car[2].get('@year')
numcars=$meta.records.car.size()
names=#foreach($c in $meta.records.car)$c.get('@name') #end

Would render like this:

title=test title
content=test content
country=Australia 
year=1931
numcars=3
names=HSV Maloo P50 Royale

The same technique could be used to render free form XML in another templating system like JSP.

Composing Traits and Declarative Validation

Scala’s traits are a nice fit for JSR-303 validation. Here’s an example. Suppose we have a web service interface that has methods like:

@WebService
trait Notification {
  def deleteTopic(apiKey: String, topicId: Long)
  def getSubscriber(apiKey: String, userId: String): Subscriber
  def unsubscribe(apiKey: String, userId: String, topicId: Long, context: SubscriptionContext)
  //...
}

Notice that the methods share many but not all of the same parameters. For example, all of them take apiKey. Our validation code should be factored to handle common parameters independently.  JSR-303 supports this very well. However, we also want to return validation errors to the caller with names that correspond directly to the web service method parameters. For example, if deleteTopic() is called with an invalid topicId, we want to respond with a fault like:

<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
   <soap:Body>
      <soap:Fault>
         <faultcode>soap:Client</faultcode>
         <faultstring>Errors in request</faultstring>
         <errors>
           <error>
             <inputElement>topicId</inputElement>
             <errorMessage>99 is invalid</errorMessage>
          </error>
        </errors>
      </soap:Fault>
   </soap:Body>
</soap:Envelope>

The parameter name/validation error requirement would be a pain to handle in Java because the validation object would need to be composed as a hierarchy of individual objects. The hierarchy would be reflected in the validation errors and would have to be manually flattened before returning the errors to the caller.

In our web service implementation we will validate the parameters by creating a flat object containing the method parameters as properties.  Then we will call throwFaultIfInvalid() which will validate the parameters and throw a Fault exception if there are errors. For example:

  def unsubscribe(apiKeyIn: String, userIdIn: String, topicIdIn: Long, context: SubscriptionContext) {
    val cmd = new UnsubscribeCmd {
      apiKey = apiKeyIn
      userId = userIdIn
      topicId = topicIdIn
      subscriptionContext = context
    }
    throwFaultIfInvalid(cmd)

    // Logic to do unsubscribe...
  }

Above, cmd is a stack of five validation traits on top of a base Object.

@UnsubscribeCmdValid
trait UnsubscribeCmd extends UserIdCmd with TopicIdCmd with SubscriptionContextCmd {
  var subscription: Subscription = _
}

@UserIdValid
trait UserIdCmd extends ApiKeyCmd {
  @NotEmpty var userId: String = _
  lazy val subscriber: Option[Subscriber] = // findSubscriber(userId)
}

@TopicIdValid
trait TopicIdCmd extends ApiKeyCmd {
  @NotZero var topicId: Long = _
  lazy val topic: Option[Topic] = // findTopic(topicId)
}

@SubscriptionContextValid
trait SubscriptionContextCmd extends ApiKeyCmd {
  @NullInvalid var subscriptionContext: SubscriptionContext = _
}

@ApiKeyValid
trait ApiKeyCmd {
  @NotEmpty var apiKey: String = _
  lazy val application: Option[Application] = // findApplication(apiKey)
}

The validator for UnsubscribeCmd looks like:

class UnsubscribeCmdValidator extends Validator[UnsubscribeCmdValid, UnsubscribeCmd] {
  @Autowired var subscriptionDao: SubscriptionDao = _

  def isValid(obj: UnsubscribeCmd, context: ConstraintValidatorContext): Boolean = {

    ifSome(for (subscriber  obj.subscription = subscription
    }
  }

Nice! Traits allow us to compose the validators together in a way that results in very clean validation objects and validation code.

Side note

Sadly, as of Scala 2.9 you can’t create custom JSR-303 annotations (like @UnsubscribeCmdValid above) which require RetentionPolicy.RUNTIME in Scala. Go, right now, and vote for SI-32 so we don’t have to keep writing these in Java.   Thank you for your support.

Targeting annotations to Javabean accessors in Scala

On a recent project I had written a simple scheduling service. I wanted to expose some properties and methods to JMX so I could view the configuration at runtime and so I could perform some basic operations, like starting the service.

Spring provides a convenient way of exposing your classes to JMX using annotations, eliminating a ton of boilerplate. My initial stab at decorating the service for JMX looked like:

@ManagedResource
@Component("schedulerService")
class SchedulerServiceImpl extends SchedulerService {

  @Value("${scheduler.enabled:false}")
  @ManagedAttribute @BeanProperty
  val enabled: Boolean = false

  @ManagedAttribute @BeanProperty
  var started: Boolean = _

  @ManagedOperation
  def start(): String = {
    //...
  }
  //...
}

@ManagedAttribute has to be put on Javabean getters and setters so I added @BeanProperty to the properties I wanted to expose. Actually I only wanted to expose the getter, but initially I wasn’t worried about that part.

I fired up jconsole and saw that the start() method was exposed but the started and enabled properties were not. The usual tool for debugging Scala-Java interop, javap, doesn’t display information on annotations, so I used jclasslib bytecode viewer. There I saw that the annotation had actually been set on the private field that Scala generated, not the Javabean setter.

Annotation set on "started" field



After some asking around, Iulian Dragos told me about the meta-annotations in the scala.annotation.target package. These let you target annotations at specific accessors.

This allows you to create Javabean accessor-targeted annotations like this:

  
@(ManagedAttribute @beanGetter) @BeanProperty
val enabled: Boolean = false

That is cool but pretty verbose. Fortunately you can clean it up with a type alias:

object JmxConfig {
  type ManagedGetter = ManagedAttribute @beanGetter
  type ManagedSetter = ManagedAttribute @beanSetter
}

So the end result can look like:

import JmxConfig._

@ManagedResource
@Component("schedulerService")
class SchedulerServiceImpl extends SchedulerService {

  @Value("${scheduler.enabled:false}")
  @ManagedGetter @BeanProperty
  val enabled: Boolean = false

  @ManagedGetter @BeanProperty
  var started: Boolean = _

  //....
}