In the previous post we imagined that we needed to build a caching system in front of a slow backend system. The cache needed to meet the following requirements:
- The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
- Requests to the backend system need to be throttled.
Akka actors looked like a good fit for the requirements. Each actor would handle a query for the backend system and cache the results. In part 1 I talked about the CachingSystem
which created CacheActor
s and provided helper methods for working with the caches. In this post I will cover the CacheActor
class hierarchy.
Here is the base CacheActor
class:
abstract class CacheActor[V](cacheSystem: CacheSystem) extends Actor with Logging { implicit val execContext: ExecutionContext = context.dispatcher def findValueReceive: Receive = { case FindValue(params) => findValueForSender(params, sender) } def findValueForSender(params: Params, sender: ActorRef) { val key = params.cacheKey val elem = cache.get(key) if (elem != null) { sender ! elem.getObjectValue.asInstanceOf[V] } else { Future { findObject(params) }.onComplete { case Right(result) => result match { case Some(value) => sender ! value case None => sender ! Status.Failure(new Exception("findObject returned None for key=" + key + " cache=" + cache.getName)) } case Left(ex) => sender ! Status.Failure(ex) } } } def findObject(params: Params): Option[V] = { cacheSystem.findObjectForCache(params.cacheKey, cache, finder(params)) } // Provided by subclasses val cache: Cache def finder(params: Params): () => V } object CacheActor { case class FindValue(params: Params) trait Params { def cacheKey: String } }
Part 1 showed an example of a CachingBusinessService
sending a FindValue
message to a Service1CacheActor
using the ?
(ask
) method. findValueReceive
handles FindValue
by either returning a value from the cache or making a call to the backend (via CacheSystem.findObjectForCache
) to get the value.
Concrete CacheActors
are responsible for implementing finder
which returns a function to query the backend system. The returned function is ultimately executed by CacheSystem.findObjectForCache
.
Part 1 also showed CacheSystem
sending UpdateCacheForNow
messages to periodically update cache values. UpdateCacheForNow
is handled by a subclass of CacheActor
, DateCacheActor
:
abstract class DateCacheActor[V](cacheSystem: CacheSystem) extends CacheActor[V](cacheSystem) { override def receive = findValueReceive orElse { case UpdateCacheForNow => updateCacheForNow() case UpdateCacheForPreviousBusinessDay => updateCacheForPreviousBusinessDay() } def updateCacheForNow() { val activeBusinessDay: Range[Date] = DateUtil.calcActiveBusinessDay val start = activeBusinessDay.getStart val now = new Date // If today is a business day and now is within the business day, // retrieve data from the backend and put in the cache if (now.getTime >= start.getTime && now.getTime <= activeBusinessDay.getEnd.getTime) updateCacheForDate(now) } } def updateCacheForPreviousBusinessDay() { updateCacheForDate(DateUtil.calcActiveBusinessDay.getStart) } def updateCacheForDate(date: Date) { import DateCacheActor._ // Use separate thread pool Future { findObject(new DateParams(date)) } } } object DateCacheActor { // Update cache for the current time case object UpdateCacheForNow // Update cache for previous business day case object UpdateCacheForPreviousBusinessDay // updateCacheForDate() uses a separate thread pool to prevent scheduled tasks // from interfering with user requests val FUTURE_POOL_SIZE = 5 val FUTURE_QUEUE_SIZE = 20000 private lazy val ucfdThreadPoolExecutor = new ThreadPoolExecutor(FUTURE_POOL_SIZE, FUTURE_POOL_SIZE, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(FUTURE_QUEUE_SIZE, true)) implicit lazy val ucfdExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(ucfdThreadPoolExecutor) }
During non-business hours values UpdateCacheForNow
messages are ignored and values from the previous business day are returned from the cache. If the app is started during non-business hours an UpdateCacheForPreviousBusinessDay
message is scheduled to populate cache values for the previous business day.
A separate thread pool is used to perform the backend system queries for the scheduled UpdateCacheFor*
tasks. We don’t want them to interfere with user requests which are handled using the regular actor thread pool.
Here is what a concrete DateCacheActor
would look like, using the Service1CacheActor
from part 1 as an example:
class Service1CacheActor(val cache: Cache, cacheSystem: CacheSystem, bizService: BusinessService) extends DateCacheActor[JList[Service1Result]](cacheSystem) { override def receive = super.receive override def updateCacheForDate(date: Date) { import DateCacheActor._ Future { findObject(new Service1Params(date, true)) } Future { findObject(new Service1Params(date, false)) } } def finder(params: Params) = { () => params match { case p: Service1Params => bizService.service1(p.date, p.useFoo) case _ => throw new IllegalArgumentException("...") } } } class Service1Params(date: Date, val useFoo: Boolean) extends DateParams(date) { override def cacheKey = super.cacheKey + ":" + useFoo }
Service1CacheActor
‘s implementation of updateCacheForDate
finds and caches the results of the true
and false
variations of the BusinessService.service1
backend system call.
If we wanted to cache another one of BusinessService
‘s methods using the auto-updating caching system we would:
- Subclass
DateCacheActor
, implementfinder
and potentially overrideupdateCacheForDate
. - Subclass
DateParams
, providing the parameters to the backend query, and override thecacheKey
method. - Call
createCacheActor
again inCachingBusinessService
to create the newDateCacheActor
from #1, and write a cached version of the backend query method, sendingFindValue
to the new actor and waiting for the response.
Speak Your Mind