Reliable, Low-Latency Request-Reply with ZeroMQ

zeromqZeroMQ is a cross-platform, cross-language library that provides high level wrappers around traditional low level network sockets. Some of its most helpful features include: sockets that automatically reconnect after connection failures, making hostname connection strings easy to work with, and providing sensible defaults for buffer sizes and other parameters. However, ZMQ goes a step further than just simplifying initialization and error handling. Instead of the standard send and receive socket functions, it supports several types of socket patterns that restrict which types of sockets can pair, send, receive, as well as the sequencing of send and receive calls. Since the library design and documentation are fairly opinionated about the right way to do things, using ZMQ can be somewhat difficult if your application doesn’t exactly fit one of its predefined connection patterns.

Why We Needed a Reliable and Low Latency Request-Reply Implementation

Here at AddThis, our website tools are loaded over 3 billion times a day and each of these events results in an HTTP request to our datacenter. We have a variety of internal services that need to subscribe and respond to these events to power features such as dynamic tool configurations and personalization. In this context, “internal” implies that the service is running in the same datacenter and can communicate with low latency. It would be very expensive for us in terms of bandwidth and page load time to have our website tools hit each of these services individually over HTTP. Instead, we use one externally-facing HTTP coordinator application that forwards these incoming events to the relevant internal services and then aggregates their results into a single HTTP response.

In terms of the standard ZMQ request-reply pattern, the HTTP coordinator is the client (request socket) and the internal services are the servers (reply sockets). In order to minimize the overall response time, the coordinator needs to call each of these services in parallel. This makes the overall response time equal to the slowest responding service, whereas calling services sequentially would take as long as the sum of their response times. Since we have strict requirements on our overall response time (which is reflected in our publishers’ page load times), the coordinator needs to be able to abandon responses from internal services if they become too slow or unresponsive, while still returning a non-error HTTP response to the browser. Additionally, each service typically consists of many separate instances across multiple physical machines in order to handle the load (over 50k events/second at peak).

These requirements pose several problems for the built-in request-reply sockets provided by ZMQ. Firstly, request-reply sockets have strict requirements on the sequencing of send and receive calls. Specifically, each transaction consists of the following sequence: client send, server receive, server send, and client receive. Because the receive methods are blocking, they can be started before the opposite send calls are initiated. However, this means that if the send never happens, the receiver will hang waiting for a response. Furthermore, trying to ignore a response and resume sending will result in ZMQ throwing invalid state exceptions, essentially making the current socket useless. This is a significant problem for our application, where hanging receives would result in either our internal services becoming unresponsive or our coordinator returning HTTP errors when browsers try to load our website tools. Although the ZMQ documentation does have several suggestions for handling failed responses, their solutions are typically inelegant, require sockets to be closed and recreated, and are still subject to race conditions. They suggest both client and server should abandon responses after a given timeout and the client should reconnect; however, using a simple wall-clock timeout does not guarantee that both sides will give up at the same time, so the server could still continue processing a request that the client has already abandoned and/or resent.

Secondly, ZMQ request-reply sockets have no support for multiplexing multiple concurrent requests. Each instance of our HTTP coordinator receives thousands of requests per second, handling many of these in parallel while waiting for internal services to return. Since each internal service runs on a variable number of machines (which are load balanced based on response time), it’s expected that a single coordinator may have multiple concurrent requests to a single instance of an internal service. Using ZMQ request-reply sockets these requests would either need to be executed sequentially (slowing down the requests executed later), or we would need to dynamically create additional request-reply socket pairs as requests become queued.

Rather than trying to work-around these limitations and fit our application to ZMQ’s mold, we implemented a more robust request-reply pattern that uses ZMQ push-pull and pull-push socket pairs. Push and pull sockets are in many respects even simpler than request-reply sockets. Each push socket may only call send, and each pull socket may only call receive. While this requires you to create both a push and pull socket if you want to both receive and send, it eliminates the possibility of invalid state and sequencing errors. The resulting architecture has both the client and the server using one thread for pulling and one thread for pushing, while concurrent queues are used to communicate between threads.client_server

HTTP Coordinator: Push-Pull Client

For each external HTTP request, our coordinator must first decide which services are subscribed to that event, and which specific instances of each service will handle the request. Each call to an internal service will represent one request-reply transaction. For each transaction, we create a unique integer id using an atomically incrementing integer. Next, we create a Future-like object to store the service’s response for the transaction, which will later signal completion using a CountDownLatch. This future is inserted into a ConcurrentHashMap keyed on the transaction id so that our response handling thread will later be able to release its latch once we have the response. Then the transaction (including its unique id, internal service hostname, and the HTTP event) is added to a request queue, implemented with a LinkedBlockingQueue, to be consumed by a ZMQ pushing thread.

The ZMQ pushing thread is a very simple loop that takes transactions from the request queue and sends them to the appropriate internal service. For each transaction, we have the unique id, the target service hostname, and the HTTP event data. First we check if we already have a connection to the target host stored in a thread local HashMap, keyed on the hostname. If not, a new push socket connection to that host is created and stored in the HashMap. Then, the transaction id, HTTP event data, and the reply-address (which is the hostname of the local coordinator application) are sent to the service in a single ZMQ message.

Independently of the coordinator application, our internal service applications will be running on arbitrary machines throughout the cluster. For now, you can assume that they are pulling in the requests sent by the coordinator, performing some operations on their event data, and pushing responses back to the coordinator at the reply-address.

Back in the HTTP coordinator, there is a separate ZMQ pull thread to receive these responses. It simply initializes a pull socket, listening at the reply-address included in each request, then performs an unending loop of receiving responses from the pull socket and moving them to a LinkedBlockingQueue. Like the pushing thread, the work done in this loop is kept to a minimum in order to consume ZMQ messages as quickly as possible and prevent this single thread from becoming a bottleneck.

Finally, a response handler thread consumes responses from this queue and updates the result future from when the transaction was first started. Since the response message contains the transaction id, we can lookup the future in our shared ConcurrentHashMap, set the response, and release the CountDownLatch so threads waiting on the result of this transaction can be unblocked. In our case, the HTTP coordinator uses a single thread to process each request due to our use of the Jetty library. This thread creates several transactions depending on which services are subscribed to the HTTP event, and then it blocks (with a configurable timeout) on the transactions’ CountDownLatches while the ZMQ threads asynchronously handle pushing the requests and pulling the replies. Once all transactions are complete or the timeout has been reached, the coordinator merges the responses that it has received into a single HTTP response.

This is a relatively high-level description, since it leaves out several application specific implementation details. In particular: 1. how serialization is used to merge several fields together into a single ZMQ message, 2. how ports are selected and included with hostnames, 3. how instances of internal services are registered and discovered by the coordinator using ZooKeeper, 4. how multiple instances of each service are load balanced by response time, and possibly other minor details.

Internal Services: Pull-Push Servers

The internal services are implemented roughly as a mirror of the push-pull client-side. Instead of pushing a request then pulling a response, it first pulls a request then pushes a response.

The first service-side stop for each request is the ZMQ pulling thread. Just like the client-side pulling thread, it initializes a ZMQ pull socket, then performs an unending loop that receives from the socket and moves the request to a LinkedBlockingQueue.

Meanwhile, a request handler thread continuously loops and pulls from this request queue. For each request, a Runnable is created that runs the internal service logic on the event data and then adds the response (along with the transaction id and reply-address) to a LinkedBlockingQueue of responses. This Runnable is submitted to a threadpool and runs asynchronously. The number of threads in this pool depends on how much computation needs to be done for that specific service, and how many requests the service is expected to handle concurrently.

Finally, a ZMQ pushing thread continuously loops and pulls from the response queue. Just like the client-side push thread, it checks if it already has an open connection to the response’s reply-address, creating and caching a new push socket if needed. Then it sends the response and transaction id to the reply-address using this push socket. As covered in the coordinator’s overview, this push socket is connected to the client-side pull socket which will receive the response and complete the transaction.

So, what’s the takeaway?

While ZMQ provides very useful wrappers around low-level networking sockets, its built-in request-reply sockets don’t have very good support for timeouts, dropped requests, and multiplexing parallel requests. Instead of using a single thread that sends and receives using the same socket, this alternative implementation uses ZMQ push and pull sockets and joins requests with responses using transaction ids. The resulting request-reply pattern is significantly more reliable and suited for high-throughput, low-latency applications.

  • Steve Donnelly

    Thanks for sharing. After briefly looking at the documentation and code for Mongrel2, it appears to have a comparable design to our http coordinator at a high level and should have similarly low latency. It doesn’t seem to expose many options for timeouts and backend load balancing so if you need these fine-grained controls, the design in this post may still be useful in rolling your own.

  • artem-v

    I suppose you are on java, so why you guys picked up zmq instead of netty?

  • Steve Donnelly

    It’s fairly subjective but ZeroMQ was chosen because its API seemed much easier to learn, while also requiring less tuning for performance. This system was developed as a replacement for a previous backend that was starting to struggle at peak traffic rates but was too complicated to optimize. Thus, minimizing development time and code complexity were high priority, while the performance of ZeroMQ seemed unlikely to be a bottleneck.

    Since I’m only slightly familiar with Netty, it’s hard to say whether or not it would have been a better choice for implementing a request-reply pattern. I’m curious what the trade-offs would have been in terms of code size and performance.

  • artem-v

    You guys using JNI zmq or pure java jeromq? If former, then have you experienced any kind of issues related with deploying and running native library along with your java application?

  • Steve Donnelly

    We use JNI zmq. I can’t say that we’ve had many problems deploying or running with the native library. Typically we have libzmq deployed to our servers as a Chef recipe, while the libjzmq binary is packaged with our applications. Conveniently, since all of our services run on the same OS and architecture we only need one version of the jzmq binary.

  • Pingback: Link Roundup, 3-16-15: Billions and Billions Served: Real-Time Distributed Messaging | SATURN Network Blog()