Clearspring’s Big Data Architecture, Part 3

Querying Data

This is Part 3 of a four part (Part 1Part 2) series of blog posts discussing Clearspring’s big data architecture.  This post will describe the distributed query system we use to quickly access terabytes of data that is distributed across hundreds of machines. The query subsystem is comprised of two key components, QueryMaster and QuerySlave.  A single cluster can have multiple QueryMasters and each processing node in the cluster will have one or more QuerySlaves.

The QueryMaster accepts a query, identifies which job the query is for, sends the query to the QuerySlaves that have data for the specified job, and then aggregates the responses from each slave into a single result set that is provided back to the client.

QuerySlaves and the QueryMaster communicate via ZooKeeper and RabbitMQ.  It is important to store the mapping of jobs to nodes in a distributed configuration management system like ZooKeeper because the system is not static.  Data in the cluster can move from one node to another when the system re-balances data or when a node fails.  This dynamic mapping allows the QueryMaster to only query the nodes we know have data and dynamically change the set of nodes queried whenever the location of data in the cluster changes.

The query is broken down into local and remote operations.  Operations are things like aggregate, sort, pivot, multiply, filter, concatenate, and so on.  Local operations are performed by the QueryMaster and remote operations are performed by the QuerySlaves.  Remote operations allow the system to take advantage of the system resources available in the cluster.  Having each node sorting a subset of the data is much more efficient than having the QueryMaster sort the full set of unsorted data.  Remote operations can also significantly reduce the amount of data transferred between the QuerySlave and QueryMaster.

Query Example

The best way to understand how the query system works is to look at an example.  We have developed a query language specific to querying data from trees.  Imagine that we have a tree that has the path:  ymd/{date}/{type}/{url}.  Where the values inside of the brackets are data elements that are populated with the input values that are added to the tree and ‘ymd’ is a constant the represents the root of the tree.  The following example will query data from this tree for the dates 110322 and 110323 for any type.

The results for this query would look like:

So this query returns the number of times we saw a URL on a certain day.  The ‘+:+count’ fragment at the end of the query instructs the query system to return the meta-data ‘count’ for that node in the tree. Count is a data attachment that is automatically attached to every node in the tree and simply counts the number of times a specific path through the tree has been followed. If we replace ‘+110322,110323’ with ‘110322,110323’ it will remove the dates from the result set but it will still use them to filter the data.  The results of the modified query would look like:

The QueryMaster also provides a query caching infrastructure based on the structure of the query.  This enables the QueryMaster to return data quickly in cases where the same query is executed multiple times without querying the slaves in the cluster.


Query Operations

Query operations allow a query designer to transform the results of a query to meet their needs.  Here are a few common operations available in the system.

Each of these operations can be executed on the QueryMaster and/or the QuerySlave. As an example suppose we wanted to count the number of times a URL was shared as we did in the example above. In this case we could use the following operations:

If the raw data on a data node looked like:

Then a QuerySlave, using the operation ‘gather=ks’, would return:

The QueryMaster will receive result sets like the one above from each QuerySlave in the cluster and then perform the gather operation on all of those results.  Finally the QueryMaster will sort the results in descending order using the count column as the sort key.



This post has provided a high-level overview of the distributed query engine that provides fast and flexible access to the data stored in our clusters.  The final post in the series will focus on the command and control subsystem that orchestrates job execution and resource management in our clusters.  If you’re interested in learning more about Hydra and other big data architectures please join us at the next Big Data DC monthly meetup.