{"id":263,"date":"2012-07-22T20:41:50","date_gmt":"2012-07-23T00:41:50","guid":{"rendered":"http:\/\/localhost\/wordpress\/?p=263"},"modified":"2013-01-21T23:18:09","modified_gmt":"2013-01-22T04:18:09","slug":"an-auto-updating-caching-system-part-2","status":"publish","type":"post","link":"https:\/\/sourcedelica.com\/blog\/2012\/07\/an-auto-updating-caching-system-part-2\/","title":{"rendered":"An Auto-Updating Caching System &#8211; part 2"},"content":{"rendered":"<p>In the <a href=\"\/blog\/2012\/07\/an-auto-updating-caching-system-part-1\/\">previous post<\/a> we imagined that we needed to build a caching system in front of a slow backend system. The cache needed to meet the following requirements:<\/p>\n<ul>\n<li>The data in the backend system is constantly being updated so the caches need to be updated every N minutes.<\/li>\n<li>Requests to the backend system need to be throttled.<\/li>\n<\/ul>\n<p><a href=\"http:\/\/doc.akka.io\/docs\/akka\/2.0.1\/scala\/actors.html\">Akka actors<\/a> looked like a good fit for the requirements. Each actor would handle a query for the backend system and cache the results. In part 1 I talked about the <code>CachingSystem<\/code> which created <code>CacheActor<\/code>s and provided helper methods for working with the caches. In this post I will cover the <code>CacheActor<\/code> class hierarchy.<\/p>\n<p>Here is the base <code>CacheActor<\/code> class:<\/p>\n<pre class=\"brush: scala\">abstract class CacheActor[V](cacheSystem: CacheSystem) extends Actor with Logging {\r\n  implicit val execContext: ExecutionContext = context.dispatcher\r\n\r\n  def findValueReceive: Receive = {\r\n    case FindValue(params) =&gt; findValueForSender(params, sender)\r\n  }\r\n\r\n  def findValueForSender(params: Params, sender: ActorRef) {\r\n    val key = params.cacheKey\r\n    val elem = cache.get(key)\r\n\r\n    if (elem != null) {\r\n      sender ! elem.getObjectValue.asInstanceOf[V]\r\n    } else {\r\n      Future { findObject(params) }.onComplete {\r\n        case Right(result) => result match {\r\n          case Some(value) => sender ! value\r\n          case None => sender ! Status.Failure(new Exception(\"findObject returned None for key=\" + key + \" cache=\" + cache.getName))\r\n        }\r\n        case Left(ex) => sender ! Status.Failure(ex)\r\n      }\r\n    }\r\n  }\r\n\r\n  def findObject(params: Params): Option[V] = {\r\n    cacheSystem.findObjectForCache(params.cacheKey, cache, \r\n                                   finder(params))\r\n  }\r\n\r\n  \/\/ Provided by subclasses\r\n  val cache: Cache\r\n  def finder(params: Params): () =&gt; V\r\n}\r\n\r\nobject CacheActor {\r\n  case class FindValue(params: Params)\r\n\r\n  trait Params {\r\n    def cacheKey: String\r\n  }\r\n}<\/pre>\n<p><a href=\"\/blog\/2012\/07\/an-auto-updating-caching-system-part-1\/\">Part 1<\/a>\u00a0showed an example of a\u00a0<code>CachingBusinessService<\/code>\u00a0sending a\u00a0<code>FindValue<\/code>\u00a0message to\u00a0a\u00a0<code>Service1CacheActor<\/code>\u00a0using the\u00a0<code>?<\/code>\u00a0(<code>ask<\/code>) method. \u00a0<code>findValueReceive<\/code>\u00a0handles <code>FindValue<\/code>\u00a0by\u00a0either returning a value from the cache or making a call to the backend (via <code>CacheSystem.findObjectForCache<\/code>) to get the value.<\/p>\n<p>Concrete\u00a0<code>CacheActors<\/code>\u00a0are responsible for implementing <code>finder<\/code> which returns a function to query the backend system.\u00a0The returned function is ultimately executed by <code>CacheSystem.findObjectForCache<\/code>.<\/p>\n<p>Part 1 also showed\u00a0<code>CacheSystem<\/code>\u00a0sending <code>UpdateCacheForNow<\/code> messages to periodically update cache values. <code>UpdateCacheForNow<\/code> is handled by a subclass of <code>CacheActor<\/code>,\u00a0<code>DateCacheActor<\/code>:<\/p>\n<pre class=\"brush: scala\">abstract class DateCacheActor[V](cacheSystem: CacheSystem) \r\n    extends CacheActor[V](cacheSystem) {\r\n\r\n  override def receive = findValueReceive orElse  {\r\n    case UpdateCacheForNow =&gt; updateCacheForNow()\r\n\r\n    case UpdateCacheForPreviousBusinessDay =&gt; updateCacheForPreviousBusinessDay()\r\n  }\r\n\r\n  def updateCacheForNow() {\r\n    val activeBusinessDay: Range[Date] = DateUtil.calcActiveBusinessDay\r\n    val start = activeBusinessDay.getStart\r\n    val now = new Date\r\n\r\n    \/\/ If today is a business day and now is within the business day, \r\n    \/\/ retrieve data from the backend and put in the cache\r\n    if (now.getTime &gt;= start.getTime &amp;&amp; \r\n        now.getTime &lt;= activeBusinessDay.getEnd.getTime)\r\n      updateCacheForDate(now)\r\n    } \r\n  }\r\n\r\n  def updateCacheForPreviousBusinessDay() {\r\n    updateCacheForDate(DateUtil.calcActiveBusinessDay.getStart)\r\n  }\r\n\r\n  def updateCacheForDate(date: Date) {\r\n    import DateCacheActor._    \/\/ Use separate thread pool\r\n    Future { findObject(new DateParams(date)) }\r\n  }\r\n}\r\n\r\nobject DateCacheActor {\r\n  \/\/ Update cache for the current time\r\n  case object UpdateCacheForNow  \r\n\r\n  \/\/ Update cache for previous business day   \r\n  case object UpdateCacheForPreviousBusinessDay\r\n\r\n  \/\/ updateCacheForDate() uses a separate thread pool to prevent scheduled tasks \r\n  \/\/ from interfering with user requests\r\n  val FUTURE_POOL_SIZE = 5\r\n  val FUTURE_QUEUE_SIZE = 20000\r\n\r\n  private lazy val ucfdThreadPoolExecutor = \r\n    new ThreadPoolExecutor(FUTURE_POOL_SIZE, FUTURE_POOL_SIZE, 1, TimeUnit.MINUTES, \r\n                           new ArrayBlockingQueue(FUTURE_QUEUE_SIZE, true))\r\n  implicit lazy val ucfdExecutionContext: ExecutionContext = \r\n    ExecutionContext.fromExecutor(ucfdThreadPoolExecutor)\r\n}<\/pre>\n<p>During non-business hours values <code>UpdateCacheForNow<\/code> messages are ignored and values from the previous business day are returned from the cache. \u00a0If the app is started during non-business hours an\u00a0<code>UpdateCacheForPreviousBusinessDay<\/code> message is scheduled to populate cache values for the previous business day.<\/p>\n<p>A separate thread pool is used to perform the backend system queries for the scheduled <code>UpdateCacheFor*<\/code> tasks. \u00a0 We don&#8217;t want them to interfere with user requests which are handled using the regular actor thread pool.<\/p>\n<p>Here is what a concrete <code>DateCacheActor<\/code> would look like, using the <code>Service1CacheActor<\/code> from part 1 as an example:<\/p>\n<pre class=\"brush: scala\">class Service1CacheActor(val cache: Cache, cacheSystem: CacheSystem, \r\n                         bizService: BusinessService) \r\n    extends DateCacheActor[JList[Service1Result]](cacheSystem) {\r\n\r\n  override def receive = super.receive\r\n\r\n  override def updateCacheForDate(date: Date) {\r\n    import DateCacheActor._\r\n    Future { findObject(new Service1Params(date, true)) }\r\n    Future { findObject(new Service1Params(date, false)) }\r\n  }\r\n\r\n  def finder(params: Params) = { () =&gt;\r\n    params match {\r\n      case p: Service1Params =&gt; bizService.service1(p.date, p.useFoo)\r\n      case _ =&gt; throw new IllegalArgumentException(\"...\") \r\n    }\r\n  }\r\n}\r\n\r\nclass Service1Params(date: Date, val useFoo: Boolean) extends DateParams(date) {\r\n  override def cacheKey = super.cacheKey + \":\" + useFoo\r\n}<\/pre>\n<p><code>Service1CacheActor<\/code>&#8216;s implementation of <code>updateCacheForDate<\/code> finds and caches the results of the <code>true<\/code> and <code>false<\/code> variations of the <code>BusinessService.service1<\/code> backend system call.<\/p>\n<p>If we wanted to cache another one of <code>BusinessService<\/code>&#8216;s methods\u00a0using the auto-updating caching system we would:<\/p>\n<ol>\n<li>Subclass <code>DateCacheActor<\/code>, implement <code>finder<\/code> and potentially override <code>updateCacheForDate<\/code>.<\/li>\n<li>Subclass <code>DateParams<\/code>, providing the parameters to the backend query, and override the <code>cacheKey<\/code> method.<\/li>\n<li>Call <code>createCacheActor<\/code>\u00a0again in <code>CachingBusinessService<\/code> to create the new <code>DateCacheActor<\/code> from #1, and write a cached version of the backend query method, sending <code>FindValue<\/code> to the new actor and waiting for the response.<\/li>\n<\/ol>\n","protected":false},"excerpt":{"rendered":"<p>In the previous post we imagined that we needed to build a caching system in front of a slow backend system. The cache needed to meet the following requirements: The data in the backend system is constantly being updated so the caches need to be updated every N minutes. Requests to the backend system need [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[5,3],"tags":[],"class_list":["post-263","post","type-post","status-publish","format-standard","hentry","category-akka","category-scala","entry"],"_links":{"self":[{"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/posts\/263","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/comments?post=263"}],"version-history":[{"count":66,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/posts\/263\/revisions"}],"predecessor-version":[{"id":522,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/posts\/263\/revisions\/522"}],"wp:attachment":[{"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/media?parent=263"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/categories?post=263"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/sourcedelica.com\/blog\/wp-json\/wp\/v2\/tags?post=263"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}