Building a Distributed System with Akka Remote Actors

At AddThis, we deployed our first production system written in Scala almost two years ago. Since then, a growing stack of new applications are built using this exciting language. Among the many native Scala libraries we have tried and adopted, Akka stands out as the most indispensable.

Akka is a library for building concurrent scalable applications using the Actor Model. Its fault-tolerance model is heavily influenced by Erlang. In this post, I will talk about our experience with using Akka (2.0.x) remote actors to build a distributed system, SAM.

SAM is an internal tool for managing the data processing pipeline for billions of records daily. Much of the big data heavy lifting is done with our home-grown Hydra system. SAM functions as the command and control center that submits jobs and runs queries to advance the workflow. It also needs to do intermediary and post-processing of a non-trivial amount of data returned by Hydra.

SAM is designed to be horizontally scalable, such that it can scale out when the underlying Hydra cluster scales out, or when its own workload requires more processing power. It consists of a master and multiple slaves. The master is responsible for task allocation, bookkeeping (DB access), and providing a RESTful API. The slaves are responsible for task execution, such as running hydra queries and aggregating results.

A typical workflow starts when the master receives a user request as illustrated in the sequence diagram below (the participants are Actors, and interactions are message exchanges between them):

  • TaskCreator creates a task based on the user request and sends it to the TaskQueue
  • TaskQueue enqueues the task as well as the sender–the TaskCreator’s ActorRef
  • TaskQueue dequeues a task and sends it to a Slave’s TaskReceiver, which has registered itself as a task consumer. TaskQueue actually doesn’t know or care about the locality of a consumer–it is just an ActorRef
  • TaskReceiver receives the task and sends it to the appropriate TaskProcessor, with the original sender as the result receiver
  • TaskProcessor executes the task. When processing is done, it sends the result back to the receiver–the TaskCreator that created the task at the beginning. Here again, the locality of the receiver is transparent to TaskProcessor

Remote message passing is enabled by a few lines of configuration:

akka {
	actor {
		provider = "akka.remote.RemoteActorRefProvider"
	remote {
		transport = "akka.remote.netty.NettyRemoteTransport"
		netty {
			port = 2552

Akka makes it easy to send messages to local or remote Actors. The code is exactly the same from the applications perspective. The Akka libraries automatically route the message to the correct actor.

Akka provides a very easy way to lookup remote actors:

val actor = actorSystem.actorFor("akka://actorSystemName@server:2552/user/actorName")

To minimize explicit knowledge of actors’ locality, however, we try to limit the use of such lookup in our code. The only place we use it is in the slaves for finding the MasterService actor defined by the master that allows the slaves to register themselves as TaskReceivers.

To detect cluster node connection/disconnection, we take advantage of Akka remote events.

On the slave side, we created a MasterMonitor actor dedicated to monitoring the master connection status. It tries to reconnect when the master is disconnected. On startup, the actor subscribes to the global RemoteClientLifeCycleEvent, and pings MasterService:

override def preStart = {
	context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])

The response of the ping message is not important–as a matter of fact, MasterService doesn’t even handle “ping”–it is a way to trigger the RemoteClientLifeCycleEvent that MasterMonitor is interested in:

def receive = {
	case _: RemoteClientConnected   => updateMasterStatus(newValue = true)
	case _: RemoteClientWriteFailed => updateMasterStatus(newValue = false)
	case _: RemoteClientShutdown    => updateMasterStatus(newValue = false)

If the outbound connection is established, the actor will receive RemoteClientConnected, and it sets the master connection status to true. When the master is disconnected for whatever reason, this actor will receive either RemoteClientShutdown or RemoteClientWriteFailed, and it will set the master connection status to false. Based on the master status, other slave actors such as TaskReceiver can change its behavior accordingly.

The MasterService has a similar approach to handling status messages. Slave disconnection triggers a RemoteClientLifeCycleEvent:

case RemoteClientShutdown(_, addr)          => unregisterSlave(addr)

case RemoteClientWriteFailed(_, _, _, addr) => unregisterSlave(addr)

Because the master must maintain statuses of multiple slaves, it uses the remote address value of the event to tell which slave is disconnected. This simple implementation for detecting cluster node status has proven very robust in practice. It works equally well in case of normal shutdown/reconnect as well as abnormal exits (e.g. kill -9).

Even though Akka makes it easy to use remote actors it is still a good practice to understand how that distribution works. Specifically one should be aware that:

  • Akka uses Java object serialization by default to serialize remote messages. Make sure the message objects you use can be serialized/deserialized properly (case class is always a good choice for message)

  • Default max message size is 1M, so be mindful about data transfer between remote systems via message passing. When the size of a message exceeds the limit, the error is not always obvious. (For example, if Master tried to send too large a message to Slave, a RemoteClientError would be triggered on the slave side, and RemoteClientShutdown on the master side. Had we not subscribed to RemoteClientLifeCycleEvent, it would appear that the message was simply lost.)

As we build out SAM and gain more knowledge about Akka, we realize that a few things are less than ideal in the current design and implementation:

  • In most master-slave setups, master tends to become the bottleneck/single point of failure. It’s no different for SAM. Heavy usage of the RESTful API sometimes puts the master under load, and even a short downtime of the master may affect other systems that depend on it
  • Master and slave have different code bases; this makes remoting more difficult. For example, we have to take extra care to make sure the classes we are using as messages between the master and slave are available in both. We ended up putting all those in a common module, which adds some complexity to the whole project
  • We designed the master to be the sole location for storing data files. When a task requires files to be exchanged between the master and slave, we can’t send the content of the files as messages because they are usually too large. To work around that we make the sender send a url that the receiver can use to download the file. That becomes tedious quickly. Using a distributed file system such as HDFS can be helpful here

Going forward, we’d like to change the master-slave setup to a cluster of identical nodes with dynamic leader election. A major addition in Akka 2.1.x over 2.0.x is cluster support, including a gossip protocol for cluster membership. That can potentially simplify things quite a bit for us. We can’t wait to try it out!

SAM is not the type of high throughput low latency systems that Akka is usually used for, nonetheless we still find Akka an invaluable tool. Its actor model implementation makes building correct concurrent applications easy. In this post, we covered our usage of remote actors, which is only one of many wonderful things Akka has to offer. We look forward to sharing more of our experiences with you.

Did you get this far? Are you interested in the type of scalable work we’re doing at AddThis?  If so, why not check out the engineering openings on our careers page?