Imagine you needed to build a caching system in front of a slow backend system with the following requirements:
- The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
- Requests to the backend system need to be throttled.
Here’s a possible solution taking advantage of Akka actors and Scala’s support for functions as first class objects. The first piece of the puzzle is a CacheSystem
which creates and queries EhCache caches:
class CacheSystem(name: String, updateIntervalMin: Int, cacheManager: CacheManager) { var caches = List.empty[Cache] val actorSystem = ActorSystem("cache_" + name) val DEFAULT_TTL_SEC = 86400 // 1 day def addCache(name: String, size: Int, ttlSeconds: Int = DEFAULT_TTL_SEC): Cache = { val config = new CacheConfiguration(name, size) config.setTimeToIdleSeconds(ttlSeconds) val cache = new Cache(config) cacheManager.addCache(cache) caches = cache :: caches cache } def createCacheActor(cacheName: String, cacheSize: Int, scheduleDelay: Duration, actorCreator: (Cache, CacheSystem) => Actor, ttlSeconds: Int = DEFAULT_TTL_SEC): ActorRef = { val cache = addCache(cacheName, cacheSize, ttlSeconds) val actor = actorSystem.actorOf(Props(actorCreator(cache, this)), name = cacheName + "CacheActor") actorSystem.scheduler.schedule(scheduleDelay, updateIntervalMin minutes, actor, UpdateCacheForNow) if (!DateUtil.isNowInActiveBusinessDay) { actorSystem.scheduler.scheduleOnce(scheduleDelay, actor, UpdateCacheForPreviousBusinessDay) } actor } def findCachedObject[T](key: String, cache: Cache, finder: () => T): Option[T] = { val element = cache.get(key) if (element == null) { findObjectForCache(key, cache, finder) } else { Some(element.getObjectValue.asInstanceOf[T]) } } def findObjectForCache[T](key: String, cache: Cache, finder: () => T): Option[T] = { val domainObj = finder() if (domainObj != null) { val element = new Element(key, domainObj) cache.put(element) Some(domainObj) } else { None } } def clearAllCaches() { caches.foreach(_.removeAll()) } }
The createCacheActor
method creates a cache and an actor, and schedules tasks to periodically update the cache. I decided to use actors for this because the actor system’s thread pool is a good way to meet the throttling requirement. In addition using the Akka API it is easy to have scheduled tasks send messages to actors. createCacheActor
takes a function of (Cache, CacheSystem) => Actor
to create the actor. It then fills in those parameters to create the actor using the Akka actorOf
method.
The findCachedObject
and findObjectForCache
methods take a finder
function of () => T
that handles the lookup of objects from the backend system.
Here is an example of the CacheSystem
being used by the business logic layer:
class CachingBusinessService(bizService: BusinessService) extends BusinessService { implicit val timeout = Timeout(60 seconds) val service1CacheActor = cacheSystem.createCacheActor("service1", DATE_CACHE_SIZE, 0 seconds, new Service1CacheActor(_, _, bizService)) // ... more actors created here def service1(date: Date, useFoo: Boolean): JList[Service1Result] = { val future = service1CacheActor ? FindValue(new Service1Params(date, useFoo)) Await.result(future, timeout.duration).asInstanceOf[JList[Service1Result]] } // ... more service methods }
The CachingBusinessService
is a caching implementation of the BusinessService
interface. It creates CacheActor
s to service the requests. To create the Service1CacheActor
it passes a curried constructor to createCacheActor
.
The caching implementation of service1
sends a FindValue
message to the service1CacheActor
, using the ?
(ask
) method which returns an Akka Future
. Then it waits for the result of the future and returns it to the caller.
Using Await.result
should raise a red flag. You don’t want to block if you don’t have to (epsecially inside of an actor). However in this case the BusinessService
is being called as part of a REST API served by a non-async HTTP server. Before the caching layer was introduced it would block waiting for the back end to to respond.
Here’s the code for the FindValue
message and the Params
that it contains. Params
are the parameters for the backend query. Each unique Params
object corresponds to a cache entry so each Params
subclass is responsible for generating the appropriate cache key.
object CacheActor { case class FindValue(params: Params) trait Params { def cacheKey: String } }
In the next post I’ll describe the CacheActor
class hierarchy.
Speak Your Mind