Clearspring’s Big Data Architecture, Part 2

This is part two of a four part series on Clearspring’s approach to big data analytics. (See part 1.)  This post will focus on building tree-based data structures to store the massive quantity of information that Clearspring receives each day. Trees have several useful characteristics that make them the ideal data structure for our needs. For example, trees provide a compressed representation, an index, of our data where a path through a tree represents a row of data. Trees can also be sharded and distributed throughout our clusters which allows us to create massive trees composed of hundreds of different shards. At query time we can easily gather and merge information from each shard.

 

Tree Building Process

The foundation of our distributed analytics framework is the tree building process. We selected a tree-based storage structure for our processed data because we feel that it fills a useful middle ground between the datamodel-centric view of row oriented databases and the query-centric view more common with column-oriented databases.

The TreeBuilder consumes data from one or more data sources (typically a set of Stream Servers) and then builds a tree based on the input data and a job configuration that defines the desired tree structure. An example tree might look like:

One of the more interesting aspects of our tree data structure is the concept of data attachments. We often run into situations where we’d like to store certain bits of information in a way that is easily retrievable. Some nodes in a tree may have many millions of children so questions like “which URLs were clicked the most times?”, can be very expensive. This is where the data attachment comes into play. We can use operations from our recently open sourced stream-lib project to find and store the “top” URLs as a data attachment on the parent node. This top element is dynamically updated anytime new data is added to the tree. As each new record is processed the data attachment is updated to reflect the current list of n top elements. Tops may be lossy but if the top is large enough the results are generally accurate enough for our needs. Stream-lib is a very powerful library that makes it possible to count a huge number of elements in a distributed system, find the top-k elements from massive lists of data, and perform membership detection using bloom filters. You can read more about stream-lib here or checkout the source code on github.

Let’s take a look at an example JSON configuration used to create a tree:

In the example above we create a simple tree with the root “urltree”. Underneath “urltree” we have two branches,“ymd” and “ym”, representing data broken down by day and by month. Each of those branches have two levels of children. The first level contains a list of dates and the second level has a list of URLs. The first level of both branches has one data attachment called “topurls”. This data attachment will contain a list of the URLs with the highest cardinality. So if we want to know which URL was seen the most times in the month of May or on a specific day in May that data can be retrieved quickly without requiring iteration over all of the children of the date node. The resulting tree with sample data may look like:

We use Berkely DB JE as our underlying database for each job. Berkley DB JE is a powerful database and provides a nice abstraction layer to the data structure we wanted to use. It is very common for us to have jobs that store more data than we can practically fit into memory. We created a library called PageDB that provides an abstraction layer between Berkley DB JE and the rest of our system. PageDB tracks and estimates memory consumption for each database that is paged into memory. It groups keys and their values together for efficient block level compression which reduces the disk IO required to store and retrieve data. Rather than storing first class Java objects in memory, PageDB keeps the data in compressed byte format which is significantly more efficient than storing the first class Java objects in the heap.

 

Whats Next?

Parts one and two of this series introduced Hydra, Stream Servers, and the tree-building process. These components provide the foundation necessary to discuss the distributed query engine we use to retrieve information from our clusters and that will be the topic of the next post in this series. If you’d like to work on problems like these we are always hiring.