AddThis Blog

Building a Distributed System with Akka Remote Actors

At AddThis, we deployed our first production system written in Scala almost two years ago. Since then, a growing stack of new applications are built using this exciting language. Among the many native Scala libraries we have tried and adopted, Akka stands out as the most indispensable.

Akka is a library for building concurrent scalable applications using the Actor Model. Its fault-tolerance model is heavily influenced by Erlang. In this post, I will talk about our experience with using Akka (2.0.x) remote actors to build a distributed system, SAM.

SAM is an internal tool for managing the data processing pipeline for billions of records daily. Much of the big data heavy lifting is done with our home-grown Hydra system. SAM functions as the command and control center that submits jobs and runs queries to advance the workflow. It also needs to do intermediary and post-processing of a non-trivial amount of data returned by Hydra.

SAM is designed to be horizontally scalable, such that it can scale out when the underlying Hydra cluster scales out, or when its own workload requires more processing power. It consists of a master and multiple slaves. The master is responsible for task allocation, bookkeeping (DB access), and providing a RESTful API. The slaves are responsible for task execution, such as running hydra queries and aggregating results.

A typical workflow starts when the master receives a user request as illustrated in the sequence diagram below (the participants are Actors, and interactions are message exchanges between them):

Remote message passing is enabled by a few lines of configuration:

akka {
...
	actor {
		provider = "akka.remote.RemoteActorRefProvider"
	}
	remote {
		transport = "akka.remote.netty.NettyRemoteTransport"
		netty {
			port = 2552
		}
	}
...
}

Akka makes it easy to send messages to local or remote Actors. The code is exactly the same from the applications perspective. The Akka libraries automatically route the message to the correct actor.

Akka provides a very easy way to lookup remote actors:

val actor = actorSystem.actorFor("akka://actorSystemName@server:2552/user/actorName")

To minimize explicit knowledge of actors’ locality, however, we try to limit the use of such lookup in our code. The only place we use it is in the slaves for finding the MasterService actor defined by the master that allows the slaves to register themselves as TaskReceivers.

To detect cluster node connection/disconnection, we take advantage of Akka remote events.

On the slave side, we created a MasterMonitor actor dedicated to monitoring the master connection status. It tries to reconnect when the master is disconnected. On startup, the actor subscribes to the global RemoteClientLifeCycleEvent, and pings MasterService:

override def preStart = {
	context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
	reconnect(Duration.Zero)
}

The response of the ping message is not important–as a matter of fact, MasterService doesn’t even handle “ping”–it is a way to trigger the RemoteClientLifeCycleEvent that MasterMonitor is interested in:

def receive = {
	case _: RemoteClientConnected   => updateMasterStatus(newValue = true)
	case _: RemoteClientWriteFailed => updateMasterStatus(newValue = false)
	case _: RemoteClientShutdown    => updateMasterStatus(newValue = false)
}

If the outbound connection is established, the actor will receive RemoteClientConnected, and it sets the master connection status to true. When the master is disconnected for whatever reason, this actor will receive either RemoteClientShutdown or RemoteClientWriteFailed, and it will set the master connection status to false. Based on the master status, other slave actors such as TaskReceiver can change its behavior accordingly.

The MasterService has a similar approach to handling status messages. Slave disconnection triggers a RemoteClientLifeCycleEvent:

case RemoteClientShutdown(_, addr)          => unregisterSlave(addr)

case RemoteClientWriteFailed(_, _, _, addr) => unregisterSlave(addr)

Because the master must maintain statuses of multiple slaves, it uses the remote address value of the event to tell which slave is disconnected. This simple implementation for detecting cluster node status has proven very robust in practice. It works equally well in case of normal shutdown/reconnect as well as abnormal exits (e.g. kill -9).

Even though Akka makes it easy to use remote actors it is still a good practice to understand how that distribution works. Specifically one should be aware that:

As we build out SAM and gain more knowledge about Akka, we realize that a few things are less than ideal in the current design and implementation:

Going forward, we’d like to change the master-slave setup to a cluster of identical nodes with dynamic leader election. A major addition in Akka 2.1.x over 2.0.x is cluster support, including a gossip protocol for cluster membership. That can potentially simplify things quite a bit for us. We can’t wait to try it out!

SAM is not the type of high throughput low latency systems that Akka is usually used for, nonetheless we still find Akka an invaluable tool. Its actor model implementation makes building correct concurrent applications easy. In this post, we covered our usage of remote actors, which is only one of many wonderful things Akka has to offer. We look forward to sharing more of our experiences with you.

Did you get this far? Are you interested in the type of scalable work we’re doing at AddThis?  If so, why not check out the engineering openings on our careers page?