Archives for July 2012

An Auto-Updating Caching System – part 2

In the previous post we imagined that we needed to build a caching system in front of a slow backend system. The cache needed to meet the following requirements:

  • The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
  • Requests to the backend system need to be throttled.

Akka actors looked like a good fit for the requirements. Each actor would handle a query for the backend system and cache the results. In part 1 I talked about the CachingSystem which created CacheActors and provided helper methods for working with the caches. In this post I will cover the CacheActor class hierarchy.

Here is the base CacheActor class:

abstract class CacheActor[V](cacheSystem: CacheSystem) extends Actor with Logging {
  implicit val execContext: ExecutionContext = context.dispatcher

  def findValueReceive: Receive = {
    case FindValue(params) => findValueForSender(params, sender)
  }

  def findValueForSender(params: Params, sender: ActorRef) {
    val key = params.cacheKey
    val elem = cache.get(key)

    if (elem != null) {
      sender ! elem.getObjectValue.asInstanceOf[V]
    } else {
      Future { findObject(params) }.onComplete {
        case Right(result) => result match {
          case Some(value) => sender ! value
          case None => sender ! Status.Failure(new Exception("findObject returned None for key=" + key + " cache=" + cache.getName))
        }
        case Left(ex) => sender ! Status.Failure(ex)
      }
    }
  }

  def findObject(params: Params): Option[V] = {
    cacheSystem.findObjectForCache(params.cacheKey, cache, 
                                   finder(params))
  }

  // Provided by subclasses
  val cache: Cache
  def finder(params: Params): () => V
}

object CacheActor {
  case class FindValue(params: Params)

  trait Params {
    def cacheKey: String
  }
}

Part 1 showed an example of a CachingBusinessService sending a FindValue message to a Service1CacheActor using the ? (ask) method.  findValueReceive handles FindValue by either returning a value from the cache or making a call to the backend (via CacheSystem.findObjectForCache) to get the value.

Concrete CacheActors are responsible for implementing finder which returns a function to query the backend system. The returned function is ultimately executed by CacheSystem.findObjectForCache.

Part 1 also showed CacheSystem sending UpdateCacheForNow messages to periodically update cache values. UpdateCacheForNow is handled by a subclass of CacheActorDateCacheActor:

abstract class DateCacheActor[V](cacheSystem: CacheSystem) 
    extends CacheActor[V](cacheSystem) {

  override def receive = findValueReceive orElse  {
    case UpdateCacheForNow => updateCacheForNow()

    case UpdateCacheForPreviousBusinessDay => updateCacheForPreviousBusinessDay()
  }

  def updateCacheForNow() {
    val activeBusinessDay: Range[Date] = DateUtil.calcActiveBusinessDay
    val start = activeBusinessDay.getStart
    val now = new Date

    // If today is a business day and now is within the business day, 
    // retrieve data from the backend and put in the cache
    if (now.getTime >= start.getTime && 
        now.getTime <= activeBusinessDay.getEnd.getTime)
      updateCacheForDate(now)
    } 
  }

  def updateCacheForPreviousBusinessDay() {
    updateCacheForDate(DateUtil.calcActiveBusinessDay.getStart)
  }

  def updateCacheForDate(date: Date) {
    import DateCacheActor._    // Use separate thread pool
    Future { findObject(new DateParams(date)) }
  }
}

object DateCacheActor {
  // Update cache for the current time
  case object UpdateCacheForNow  

  // Update cache for previous business day   
  case object UpdateCacheForPreviousBusinessDay

  // updateCacheForDate() uses a separate thread pool to prevent scheduled tasks 
  // from interfering with user requests
  val FUTURE_POOL_SIZE = 5
  val FUTURE_QUEUE_SIZE = 20000

  private lazy val ucfdThreadPoolExecutor = 
    new ThreadPoolExecutor(FUTURE_POOL_SIZE, FUTURE_POOL_SIZE, 1, TimeUnit.MINUTES, 
                           new ArrayBlockingQueue(FUTURE_QUEUE_SIZE, true))
  implicit lazy val ucfdExecutionContext: ExecutionContext = 
    ExecutionContext.fromExecutor(ucfdThreadPoolExecutor)
}

During non-business hours values UpdateCacheForNow messages are ignored and values from the previous business day are returned from the cache.  If the app is started during non-business hours an UpdateCacheForPreviousBusinessDay message is scheduled to populate cache values for the previous business day.

A separate thread pool is used to perform the backend system queries for the scheduled UpdateCacheFor* tasks.   We don’t want them to interfere with user requests which are handled using the regular actor thread pool.

Here is what a concrete DateCacheActor would look like, using the Service1CacheActor from part 1 as an example:

class Service1CacheActor(val cache: Cache, cacheSystem: CacheSystem, 
                         bizService: BusinessService) 
    extends DateCacheActor[JList[Service1Result]](cacheSystem) {

  override def receive = super.receive

  override def updateCacheForDate(date: Date) {
    import DateCacheActor._
    Future { findObject(new Service1Params(date, true)) }
    Future { findObject(new Service1Params(date, false)) }
  }

  def finder(params: Params) = { () =>
    params match {
      case p: Service1Params => bizService.service1(p.date, p.useFoo)
      case _ => throw new IllegalArgumentException("...") 
    }
  }
}

class Service1Params(date: Date, val useFoo: Boolean) extends DateParams(date) {
  override def cacheKey = super.cacheKey + ":" + useFoo
}

Service1CacheActor‘s implementation of updateCacheForDate finds and caches the results of the true and false variations of the BusinessService.service1 backend system call.

If we wanted to cache another one of BusinessService‘s methods using the auto-updating caching system we would:

  1. Subclass DateCacheActor, implement finder and potentially override updateCacheForDate.
  2. Subclass DateParams, providing the parameters to the backend query, and override the cacheKey method.
  3. Call createCacheActor again in CachingBusinessService to create the new DateCacheActor from #1, and write a cached version of the backend query method, sending FindValue to the new actor and waiting for the response.

An Auto-Updating Caching System – part 1

Imagine you needed to build a caching system in front of a slow backend system with the following requirements:

  • The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
  • Requests to the backend system need to be throttled.

Here’s a possible solution taking advantage of Akka actors and Scala’s support for functions as first class objects.   The first piece of the puzzle is a CacheSystem which creates and queries EhCache caches:

class CacheSystem(name: String, updateIntervalMin: Int, cacheManager: CacheManager) {
  var caches = List.empty[Cache]
  val actorSystem = ActorSystem("cache_" + name)

  val DEFAULT_TTL_SEC = 86400   // 1 day

  def addCache(name: String, size: Int, ttlSeconds: Int = DEFAULT_TTL_SEC): Cache = {
    val config = new CacheConfiguration(name, size)
    config.setTimeToIdleSeconds(ttlSeconds)
    val cache = new Cache(config)
    cacheManager.addCache(cache)
    caches = cache :: caches
    cache
  }

  def createCacheActor(cacheName: String, cacheSize: Int, scheduleDelay: Duration, 
                       actorCreator: (Cache, CacheSystem) => Actor, 
                       ttlSeconds: Int = DEFAULT_TTL_SEC): ActorRef = {

    val cache = addCache(cacheName, cacheSize, ttlSeconds)
    val actor = actorSystem.actorOf(Props(actorCreator(cache, this)), 
                                    name = cacheName + "CacheActor")

    actorSystem.scheduler.schedule(scheduleDelay, updateIntervalMin minutes, 
                                   actor, UpdateCacheForNow)   
    if (!DateUtil.isNowInActiveBusinessDay) {
      actorSystem.scheduler.scheduleOnce(scheduleDelay, actor, 
                                         UpdateCacheForPreviousBusinessDay)
    }

    actor
  }

  def findCachedObject[T](key: String, cache: Cache, finder: () => T): Option[T] = {
    val element = cache.get(key)

    if (element == null) {
      findObjectForCache(key, cache, finder)
    } else {
      Some(element.getObjectValue.asInstanceOf[T])
    }
  }

  def findObjectForCache[T](key: String, cache: Cache, finder: () => T): Option[T] = {
    val domainObj = finder()
    if (domainObj != null) {
      val element = new Element(key, domainObj)
      cache.put(element)
      Some(domainObj)
    } else {
      None
    }
  }

  def clearAllCaches() {
    caches.foreach(_.removeAll())
  }
}

The createCacheActor method creates a cache and an actor, and schedules tasks to periodically update the cache. I decided to use actors for this because the actor system’s thread pool is a good way to meet the throttling requirement. In addition using the Akka API it is easy to have scheduled tasks send messages to actors.  createCacheActor takes a function of  (Cache, CacheSystem) => Actor to create the actor.   It then fills in those parameters to create the actor using the Akka actorOf method.

The findCachedObject and findObjectForCache methods take a finder function of () => T that handles the lookup of objects from the backend system.

Here is an example of the CacheSystem being used by the business logic layer:

class CachingBusinessService(bizService: BusinessService) extends BusinessService {
  implicit val timeout = Timeout(60 seconds)

  val service1CacheActor = 
    cacheSystem.createCacheActor("service1", DATE_CACHE_SIZE, 0 seconds, 
                                 new Service1CacheActor(_, _, bizService))
  // ... more actors created here

  def service1(date: Date, useFoo: Boolean): JList[Service1Result] = {
    val future = 
      service1CacheActor ? FindValue(new Service1Params(date, useFoo))
    Await.result(future, timeout.duration).asInstanceOf[JList[Service1Result]]
  }

  // ... more service methods
}

The CachingBusinessService is a caching implementation of the BusinessService interface. It creates CacheActors to service the requests.   To create the Service1CacheActor it passes a curried constructor to createCacheActor.

The caching implementation of service1 sends a FindValue message to the service1CacheActor, using the ? (ask) method which returns an Akka Future.  Then it waits for the result of the future and returns it to the caller.

Using Await.result should raise a red flag. You don’t want to block if you don’t have to (epsecially inside of an actor). However in this case the BusinessService is being called as part of a REST API served by a non-async HTTP server. Before the caching layer was introduced it would block waiting for the back end to to respond.

Here’s the code for the FindValue message and the Params that it contains.  Params are the parameters for the backend query. Each unique Params object corresponds to a cache entry so each Params subclass is responsible for generating the appropriate cache key.

object CacheActor {
  case class FindValue(params: Params)

  trait Params {
    def cacheKey: String
  }
}

In the next post I’ll describe the CacheActor class hierarchy.