In our final post on our series about Akka, we’re going to cover a common pattern we used in building our backend: pulling. This pattern is not our creation, our work here is largely based upon work done by the Akka team (including the code itself). This post is intended to explain the motivation and benefits of this pattern and why we find it so useful at Conspire as well as why we think this pattern is a necessity in a clustered environment.
Pushing work is simple
Pushing work in Akka is very simple and is the right place to start. Don’t try to optimize before you’ve identified your bottlenecks. Pulling work should not be your default. We use pulling in certain situations:
- Dispatching work to remote nodes
- Concerns about stability of worker actors
- Specific control over the amount of work done concurrently
These three situations are where pushing begins to cause problems. Essentially, tracking work becomes more difficult, failures harder to recover from and concurrency more complicated to reason about. I’ll go into detail on these problems.
The problem with routers
Routers are the most obvious way to concurrently process work in Akka. They’re seemingly great—change some config settings and bam! Instant concurrency. For some tasks, this is adequate but the simplicity comes at a cost: routers can create blind spots in your architecture. Blindly pushing work to a router relinquishes control to the underlying router implementation. There is no built-in way to know which units of work were received by which worker, nor can that be reliably tracked. Once a message is sent to a router it is placed in a routee’s mailbox but the sender has no way of knowing which mailbox. This can cause problems, especially when used with remote routees.
Accounting for work is difficult and unreliable
As noted, routers make work more difficult to track. Imagine the following scenario: 1000 work units are sent to a RoundRobinRouter using 10 remote routees. Each routee receives 100 messages. One of your remote nodes fails and consequently, one of your routees is now dead. The dead routee still may have had an unknown number of messages in its mailbox but we have no way of knowing which messages now need to be reprocessed.
We could (and probably should!) send an acknowledgement back to the sender when the worker begins working on a unit of work when using remote workers in a situation like this but acking doesn’t help if the message hasn’t been processed yet—the acknowledgement won’t have been sent. Those messages are now lost. Depending on your use case, this may or may not be a problem.
We could track all dispatched work and assume a unit of work is lost if no acknowledgement is received within a certain time but this is complicating our code when the real problem is inherent with pushing work.
Controlling concurrency is complicated
Pushing work can also complicate the task of controlling concurrency. When working in the cloud, CPU resources are at a premium. What works great on your quad-core dev machine may fall over on a relatively anemic VPS instance. We ran into this problem several times as we moved our backend into production on AWS and term it “rogue concurrency.”
This problem doesn’t manifest itself if your worker actors do all their processing within their receive function—no futures, no child actors. Controlling concurrency in such a situation is just a matter of managing the number of worker actors and perhaps tweaking their dispatcher. The issue becomes far more complicated when your worker actors have a series of child actors or futures that are part of their processing.
Imagine your worker actor first must fetch data from an external REST service and we’re using a non-blocking client for this (we use Play’s web service library). The worker actor will send off the request and receive a future, continuing work in the future’s callback. As soon as that future is created, the worker actors moves on to the next message. This could lead to a major problem: if that worker suddenly receives a large number of work requests and fires off more REST requests than it can handle, you might end up with an OutOfMemoryError due to too much data coming back from the external service. Or, if the processing is CPU intensive, you could render your node unresponsive due to CPU thrashing.
There are certainly workable techniques for controlling concurrency while still pushing work but in my opinion, pulling is far more elegant and much easier to reason about.
Our architecture is largely adopted from the blog post linked above and that post should be read before continuing.
Here’s our implementation. Note that each of the IMAP, analytics and mailer services implements the work pulling pattern:
We use slightly different terminology: A Leader represents the master coordinator for a given service, that service has member Nodes, each of which does its work inside a Processor actor and all work requests and responses are wrapped in messages inheriting from a common base of Start/Acknowledged/Completed/Failed traits allowing us to use this pattern generically across our backend. The Pipeline Manager uses a similar pattern to manage tasks at a high level across our various backend services.
To summarize, work is sent to a Leader which holds a queue of both work and workers requesting work. In our implementation, the Leader spawns workers using a cluster-aware router but never sends messages to that router—this is used only so that the creation of remote routees is done automatically by Akka. Nodes are sent WorkIsReady messages whenever work becomes available. In return, nodes request work and will be sent work if work is available. As outlined in the Akka post, this is entirely event driven, no polling is required.
How does this pattern fix these problems?
Pulling largely eliminates the problems outlined above. Because a specific actor must pull work from the coordinator, the coordinator always knows which unit of work each worker has. Failure recovery becomes much simpler: if a worker dies, the coordinator knows which unit of work to reprocess (or quarantine for inspection). Messages don’t sit in the workers’ mailboxes so the loss of those mailboxes isn’t an issue, the coordinator keeps its own queue. Since each worker will only request more work once it completes its current task, there are no concerns about a worker receiving or starting more work than can be handled concurrently. The use of futures or child actors won’t lead to rogue concurrency. Let’s explore these benefits in more detail.
Accounting can be synchronous
As demonstrated, pushing work makes the tracking of each piece of work more difficult which in turn complicates failure recovery. There is no reliable method of knowing where a given piece of work is once it has been sent: tracking can only be done once your worker sends back an acknowledgement, leaving a hole in your failure recovery strategy.
By contrast, accounting for work becomes trivial when using pulling. As you’ll see in the code samples below, a worker actor must specifically request more work. Work is only sent when there is both a worker requesting work and work ready for processing. Under those conditions, the work queue and worker queue are both dequeued, allowing to leader to track the worker receiving this particular unit of work. By using Akka’s built-in DeathWatch, we will be notified if the worker dies and we will know exactly which piece of work needs reprocessing.
Obviously the leader itself still has to hold a queue of work—if the leader should die, that queue will be lost. Pulling doesn’t eliminate that problem but it does centralize it so that work is only queued in one place. Past that, your own use case and requirements will dictate how recovery of a failed leader should proceed.
Failure recovery is easier
Stemming from simplified work accounting, failure recovery becomes far easier. Should a node die—and its workers along with it—work will return to the work queue until workers become available. In our implementation, failed nodes are restarted automatically if they fall out of the cluster or the JVM dies. Once the actor system is back up, the cluster-aware router will create new workers which will register themselves with the leader. By using this pattern, our backend is able to heal itself (with a little help from Ubuntu’s upstart utility) without our intervention.
Concurrency is easier to control
Our workers often have to fetch multiple pieces of data asynchronously and then perform some fairly CPU intensive tasks on that data. Because of this, we can easily starve ourselves of CPU or memory resources when running on VPS instances. We have to take care not to do too much work at once or we run the risk of Linux’s memory killer killing our JVM or rendering the JVM unresponsive, causing it to fall out of the cluster. Pushing makes this much easier to manage.
By configuring the cluster-aware router to only create a certain number of instances per node, we can confidently cap the amount of work done concurrently. A worker will only request more work once its task is complete, regardless of how that task is implemented within the Processor. This frees us from concerns about futures and child actors, we aren’t beholden to the implementation of the Processor actor. Switching to this pattern improved the stability of our backend immeasurably. Before, we had nodes crash fairly often due to heavy load. After, no issues whatsoever. Pulling work makes concurrency control not just easier to implement but easier to *reason about*. We could come up with a series of one-off solutions for controlling the amount of work in each specific worker based on its implementation, effectively reducing ourselves to locks and semaphores—or we can switch to pulling and unify this control structure. We keep our backend DRY and much, much simpler.
Routers still serve a specific purpose
As noted above, routers can lead to unforseen issues but they still serve a significant purpose. We use cluster-aware routers to manage the creation of workers on remote nodes but we never use that router’s actual reference. This allows us to spin up worker nodes as blank canvases which the cluster-aware creates workers on. Routers are still useful in a number of situations but those are out of scope for this post.
Each of the problems outlined above can be solved individually while still pushing work but pulling allows us to elegantly solve all three while retaining a simple, unified approach to dispatching work.
How do we implement this?
By and large, our implementation is identical to the code in the Akka blog post linked at the beginning of this post. Our version is generic: a Leader/Node pair can be creating by simply subclassing the two classes presented here. In our case, a
Leader is created on our supervisor node and
Node actors are created remotely on worker nodes. E.g., the supervisor node has an
AnalyticsLeader which will create a certain number of
AnalyticsNodes on each Akka node with the
analytics role, based on the configuration of the cluster-aware router in the
(Akka devs: we hope you don’t mind that the bulk of this code is from your blog post)
A few notes: The
facade is just an actor that tracks the location of the supervisor within the cluster. The implementation of
Processors is left out because this pattern doesn’t rely on the implementation of the processor so long as it adheres to our message protocol. That protocol is available in the full gist.
This is our Leader class. It’s parameterized on the type of work and Node it manages.
We then create the cluster-aware router based on the configuration.
We create our work queue and workers map. Our workers map tracks all workers and uses
Option to track a worker’s current state. If the value for a given worker is
None, that worker is ready for work, otherwise we store both the unit of work and its original requester so that we can route replies appropriately.
This function is used to notify nodes that more work is available. You’ll see how this is used in the Node class later on. If work is available, we tell the workers such if we believe the worker isn’t already busy.
Upon creation, nodes immediately register themselves with the leader and are added to the worker queue. We also register for DeathWatch on the worker using
context.watch so that we will receive a
Terminated message if this worker dies.
When a worker requests work, we dequeue the work queue if work is available and send it to the worker, tracking both the original requester and the worker.
When work is done, we send the whatever message was included to the original requester and mark this node as idle.
If a node dies, we want to know about it. Akka will send a
Terminated message to all actors watching the now-dead actor. In this case, we send that work back to the Leader for reprocessing and remove that worker so it isn’t reused.
The remaining code for the Leader is available in the full gist and won’t make much sense outside the context of our architecture but it does demonstrate queuing work.
Let’s move on to the
Node class. Here we make use of Akka’s ability to change the behavior of an actor dynamically. Our nodes have two states: working and idle. We use
context.become to switch the two states as needed. In our implementation of this pattern,
Processor actors are created as-needed for each work request and never reused. This may or may not be appropriate depending on your use case. Creating a new
Processor for each request can help reduce leaks in our experience.
Depending on the node’s current state, we respond to work notifications from the Leader with work requests or we ignore the notification entirely. Note that we don’t respond to the leader directly. We route all responses through our
Facade so that we can continue working even if the supervisor dies and is restarted on a different node in the cluster.
How do we actually use these two classes? Simple.
That’s it, the pulling pattern for whatever work we need! Instantiate the Leader at startup and start passing work to it. Feel free to use the Leader and Node classes provided here—they will likely need modification to work for your specific use case. We didn’t create this pattern but hopefully our version will spark some ideas on how pulling can improve your Akka backend.
This is the final post in our series on how we use Akka at Conspire. If you’re using Akka already, we hope that you can learn from our mistakes. If you aren’t already using Akka, we hope that this series has shown you how Akka can help build a better backend. We’re definitely fans and we’re very excited about the direction Akka and Scala are headed. Happy hacking!