Clearspring’s Big Data Architecture, Part 3

We highly recommend making your way over to the Clearspring blog to read Part 3 of our Big Data Architecture blog series.

The latest post describes the distributed query system we use to quickly access terabytes of data that is distributed across hundreds of machines. The query subsystem is comprised of two key components, QueryMaster and QuerySlave.  A single cluster can have multiple QueryMasters and each processing node in the cluster will have one or more QuerySlaves.

Also, if you need catching up, here are Parts 1 and 2.

Clearspring’s Big Data Architecture, Part 3

Querying Data

This is Part 3 of a four part (Part 1Part 2) series of blog posts discussing Clearspring’s big data architecture.  This post will describe the distributed query system we use to quickly access terabytes of data that is distributed across hundreds of machines. The query subsystem is comprised of two key components, QueryMaster and QuerySlave.  A single cluster can have multiple QueryMasters and each processing node in the cluster will have one or more QuerySlaves.


The QueryMaster accepts a query, identifies which job the query is for, sends the query to the QuerySlaves that have data for the specified job, and then aggregates the responses from each slave into a single result set that is provided back to the client.

QuerySlaves and the QueryMaster communicate via ZooKeeper and RabbitMQ.  It is important to store the mapping of jobs to nodes in a distributed configuration management system like ZooKeeper because the system is not static.  Data in the cluster can move from one node to another when the system re-balances data or when a node fails.  This dynamic mapping allows the QueryMaster to only query the nodes we know have data and dynamically change the set of nodes queried whenever the location of data in the cluster changes.

The query is broken down into local and remote operations.  Operations are things like aggregate, sort, pivot, multiply, filter, concatenate, and so on.  Local operations are performed by the QueryMaster and remote operations are performed by the QuerySlaves.  Remote operations allow the system to take advantage of the system resources available in the cluster.  Having each node sorting a subset of the data is much more efficient than having the QueryMaster sort the full set of unsorted data.  Remote operations can also significantly reduce the amount of data transferred between the QuerySlave and QueryMaster.

Query Example

The best way to understand how the query system works is to look at an example.  We have developed a query language specific to querying data from trees.  Imagine that we have a tree that has the path:  ymd/{date}/{type}/{url}.  Where the values inside of the brackets are data elements that are populated with the input values that are added to the tree and ‘ymd’ is a constant the represents the root of the tree.  The following example will query data from this tree for the dates 110322 and 110323 for any type.

The results for this query would look like:

So this query returns the number of times we saw a URL on a certain day.  The ‘+:+count’ fragment at the end of the query instructs the query system to return the meta-data ‘count’ for that node in the tree. Count is a data attachment that is automatically attached to every node in the tree and simply counts the number of times a specific path through the tree has been followed. If we replace ‘+110322,110323’ with ‘110322,110323’ it will remove the dates from the result set but it will still use them to filter the data.  The results of the modified query would look like:

The QueryMaster also provides a query caching infrastructure based on the structure of the query.  This enables the QueryMaster to return data quickly in cases where the same query is executed multiple times without querying the slaves in the cluster.

 

Query Operations

Query operations allow a query designer to transform the results of a query to meet their needs.  Here are a few common operations available in the system.

Each of these operations can be executed on the QueryMaster and/or the QuerySlave. As an example suppose we wanted to count the number of times a URL was shared as we did in the example above. In this case we could use the following operations:

If the raw data on a data node looked like:

Then a QuerySlave, using the operation ‘gather=ks’, would return:

The QueryMaster will receive result sets like the one above from each QuerySlave in the cluster and then perform the gather operation on all of those results.  Finally the QueryMaster will sort the results in descending order using the count column as the sort key.

 

Conclusion

This post has provided a high-level overview of the distributed query engine that provides fast and flexible access to the data stored in our clusters.  The final post in the series will focus on the command and control subsystem that orchestrates job execution and resource management in our clusters.  If you’re interested in learning more about Hydra and other big data architectures please join us at the next Big Data DC monthly meetup.

 

Clearspring’s Big Data Architecture, Part 2

This is part two of a four part series on Clearspring’s approach to big data analytics. (See part 1.)  This post will focus on building tree-based data structures to store the massive quantity of information that Clearspring receives each day. Trees have several useful characteristics that make them the ideal data structure for our needs. For example, trees provide a compressed representation, an index, of our data where a path through a tree represents a row of data. Trees can also be sharded and distributed throughout our clusters which allows us to create massive trees composed of hundreds of different shards. At query time we can easily gather and merge information from each shard.

 

Tree Building Process

The foundation of our distributed analytics framework is the tree building process. We selected a tree-based storage structure for our processed data because we feel that it fills a useful middle ground between the datamodel-centric view of row oriented databases and the query-centric view more common with column-oriented databases.

The TreeBuilder consumes data from one or more data sources (typically a set of Stream Servers) and then builds a tree based on the input data and a job configuration that defines the desired tree structure. An example tree might look like:

One of the more interesting aspects of our tree data structure is the concept of data attachments. We often run into situations where we’d like to store certain bits of information in a way that is easily retrievable. Some nodes in a tree may have many millions of children so questions like “which URLs were clicked the most times?”, can be very expensive. This is where the data attachment comes into play. We can use operations from our recently open sourced stream-lib project to find and store the “top” URLs as a data attachment on the parent node. This top element is dynamically updated anytime new data is added to the tree. As each new record is processed the data attachment is updated to reflect the current list of n top elements. Tops may be lossy but if the top is large enough the results are generally accurate enough for our needs. Stream-lib is a very powerful library that makes it possible to count a huge number of elements in a distributed system, find the top-k elements from massive lists of data, and perform membership detection using bloom filters. You can read more about stream-lib here or checkout the source code on github.

Let’s take a look at an example JSON configuration used to create a tree:

In the example above we create a simple tree with the root “urltree”. Underneath “urltree” we have two branches,“ymd” and “ym”, representing data broken down by day and by month. Each of those branches have two levels of children. The first level contains a list of dates and the second level has a list of URLs. The first level of both branches has one data attachment called “topurls”. This data attachment will contain a list of the URLs with the highest cardinality. So if we want to know which URL was seen the most times in the month of May or on a specific day in May that data can be retrieved quickly without requiring iteration over all of the children of the date node. The resulting tree with sample data may look like:

We use Berkely DB JE as our underlying database for each job. Berkley DB JE is a powerful database and provides a nice abstraction layer to the data structure we wanted to use. It is very common for us to have jobs that store more data than we can practically fit into memory. We created a library called PageDB that provides an abstraction layer between Berkley DB JE and the rest of our system. PageDB tracks and estimates memory consumption for each database that is paged into memory. It groups keys and their values together for efficient block level compression which reduces the disk IO required to store and retrieve data. Rather than storing first class Java objects in memory, PageDB keeps the data in compressed byte format which is significantly more efficient than storing the first class Java objects in the heap.

 

Whats Next?

Parts one and two of this series introduced Hydra, Stream Servers, and the tree-building process. These components provide the foundation necessary to discuss the distributed query engine we use to retrieve information from our clusters and that will be the topic of the next post in this series. If you’d like to work on problems like these we are always hiring.

Processing Our Data

Currently taking place on the Clearspring blog is a four part series about how our team processes tens of billions of unique, new data points on a daily basis.

In storage terms, we ingest 4 to 5 TB of new data each day; that could easily double in 12-18 months. That data must be processed as live streams, re-processed from the archive as new algorithms are developed and able to be queried in flexible ways. An interesting mix of batch, live and hybrid jobs are employed.

So head on over to the Clearspring blog to learn all about the wizardry!

Clearspring’s Big Data Architecture, Part 1

This is the first post in a three part series that will describe the data analytics architecture Clearspring has created to process large amounts of semi-structured data.  Check out post 2 and post 3 when you’re done reading this post.

On a daily basis, the Clearspring network yields tens of billions of new, unique data points. In storage terms, we ingest 4 to 5 TB of new data each day; that could easily double in 12-18 months. That data must be processed as live streams, re-processed from the archive as new algorithms are developed and able to be queried in flexible ways. An interesting mix of batch, live and hybrid jobs are employed. To keep pace with a rapidly expanding network, the architecture must not only efficiently handle the current set of tasks, but it must also enable significant growth while guarding against hardware and software faults. To satisfy these needs, we built an entire software suite that leverages open source technologies and many in-house software components.

 

Desired System Characteristics and Challenges

  1. Efficient and reliable storage of large amounts of data on unreliable machines in a distributed system with fast writes, data partitioning and replication, and data streaming capabilities.
  2. Knowledge discovery.  Our system consumes a large amount of raw data that in and of itself is not very interesting so we need to be able to extract the interesting information by analyzing large streams of data from multiple sources.
  3. Management of distributed resources.  We wanted the system to distribute jobs in the cluster, determine when it is safe to execute the job, report failures, and provide a nice GUI to visualize the current state of the cluster.
  4. Speed.  The small things add up when you do them over a billion times.  For example, if we have 2.5 billion messages and we shave 1 millisecond (.001 seconds) off the time it takes to process a single event we will have saved nearly 29 days of computation time.  In our case, milliseconds matter!

There are many systems available that provide some of the characteristics listed above, including  Enterprise SANsHDFSOpen Stack (Swift),CouchDBRedisElephantDBVoldemortHadoopCassandra, and others.   While we do use some of these projects for various components of our architecture, as an example we use Cassandra for our distributed counting system that keeps track of how many times a URL is shared and serves over 200M view requests daily, the primary system we use for our distributed processing needs is a project we call Hydra.

 

Hydra

The Hydra system we created internally has four main functional areas:
  1. Capability to partition, store, and stream data in a distributed system.
  2. Software that takes an input data stream, processes that data (filtering, counting, membership detection, etc) and then stores the batch output in Berkley DB JE which uses sequential writes and indexes the batch output for fast reads.
  3. A set of software modules that enable ad-hoc queries against tree-based data structures.
  4. Command and control subsystem responsible for monitoring and executing jobs in the cluster.

 

Streaming and Splitting Data

We have too much raw data to store on a single machine and we were not interested in purchasing an enterprise SAN capable of storing petabytes of data.  While we looked at several distributed file systems like HDFS, GlusterFS, and others we ended up going with a standard ext3 filesystem and implemented our own distribution and replication framework on top of that standard file system. There are two standard modules, Splitters and Stream Servers.

Splitter

The Splitter consumes an input stream from a data source.  The data source could be a flat file on the local system, an input stream sourced by a Stream Server, a firehose of data exposed via an API, or any other source you can think of.  The format of the source data from the stream is described to the splitter (in the near future we will be using Zookeeper to store stream formatting information).  Each splitter consumes a sub-set of the data based on a hash function. The Splitter then uses the stream format information and the job configuration script (usually a JSON file) to filter, transform, and create data elements which will be stored in n partitions on the local file system.  After a checkpoint is reached (all available input data has been consumed or some time limit has been reached), the files the Splitter stores are made available to other processes in the cluster via the Stream Servers.

Splitter processes are responsible for ensuring that they do not reprocess data that has already been consumed.  They do this by storing meta data in their local job directory that indicates which files they’ve consumed and an offset indicating how far into those files the process has read.  Because we only append to log files and never modify existing content the splitter process doesn’t need to re-read data it has already processed when more data is appended to a file.  We feel having each client track its own state is a better approach than having the StreamServer attempt to track the current state of every consumer in the cluster.

This simple setup can accomplish some very powerful things beyond the obvious capability of storing and partitioning raw data to a local node in the cluster.  One useful function the Splitter provides is recombining many small files into a smaller number of time sorted files.  This can be accomplished by having n splitter nodes consume from m sources where n < m.  All of the data from the data sources will be consumed but they will be persisted into a smaller number of files than originally required.

Stream Server

Stream Servers provide access to the files stored on one node to any other node in the cluster. The Stream Server transmits raw bytes to clients because they may be in a compressed format. This limits the the number of times the data in the file is decoded and  reduces the total amount of data transmitted over the network. This approach does prevent the Stream Server from applying fine grained filters to the individual log lines. However we think that this limitation is acceptable given CPU cycles saved by only decoding the file once on the client side and the massive reduction in network IO.

Requests for data come into the Stream Server in the form of regular expressions which describe which files the client would like to receive.  Here is an example of what a request might look like:

 

 

The snippet above would tell the Stream Server to find all files in any directory under ‘job/1/*/live/split/us’ that has a file that matches the regular expression defined in the match field.  The date format will be replaced at run-time with the set of dates the client is interested in and the mod variable will be replaced with the modulus for the client.  This approach enables a client to stream data for a specific partition and time range without requiring complex filtering on the server side.

 

Conclusion

This was part one in a series of posts where we will explain how Clearspring processes and stores multiple terabytes of data daily.  In upcoming articles we will explore how and why we use a tree based data structure, our distributed query system, and the command and control infrastructure that binds it all together.  Check out post 2 and post 3 to learn more.  Subscribe to our RSS feed or follow us on twitter to stay informed.

 

New Open Source Stream Summarizing Java Library

Here at Clearspring we like to count interesting things. How many users have visited a site? How many unique URLs are shared every hour? It turns out that counting is a non-trivial problem when there are several billion things to count. And counting is only the first step. What about the frequency of the things you are counting? Maintaining a complete multiset — with billions of elements indexed by multiplicity — for each dimension of interest is rarely practical. So, we’ve developed a set of utilities to help make counting to a billion easy.

Today we are pleased to release those utilites under an open-source license. Stream Lib is a Java library for summarizing streams of data. Included are classes for estimating:

  • Cardinality (aka “counting things”): Instead of storing an entire set of elements it is possible to instead construct a compact binary object that will provide a tunable estimate to how many distinct elements have been seen. This reduces memory requirements by orders of magnitude. There is a significant body of academic literature on approaches to this problem. We have tried to provide useful implementations of those ideas.
  • Set membership: Bloom filters provide a space efficient way to test for set membership. They have the useful property that their are no false negatives, only false positives within whatever bounds you specify. The wikipedia page has a good section on the interesting variants that have been developed over the recent years. We have adapted Apache Cassandra’s well tested implementation for standalone use.
  • Top-k elements: While counting the number of distinct elements with a cardinality estimator is cool, sometimes (well often) you also want to know something about those elements (such as the most frequent ones). We have some early work in this area (a stochastic topper).

There is a Readme to get your started with the code. We hope that others find this as useful as we have. Feedback, comments, patches, bugs, and forks are all welcome.

Think this is cool? Apply for a job to join the team!

AddThis for Developers

We recently launched an AddThis for Developers page to encourage coders to get even more out of our sharing tools. In particular, we wanted to highlight our APIs and make them more discoverable:

Services API
The Services API provides a list of our supported sharing services, along with their icons and service meta-data in JSON, JSONP and XML formats.

Client API
The Client API is a client-side JavaScript API that allows you to customize the appearance and behavior of AddThis tools on your pages – with analytics.

Analytics API
The Analytics API is a set of web services that give you programmatic access to your analytics data. Integrate our metrics into your own tools and work-flow.

Also, if you are a developer who has created something awesome using the AddThis APIs, like the user who developed an AddThis plug-in for Opera, please send it our way and we’d love to feature it.

Using HTML 5 to Make the Analytics Charts

What happened when you attempted to access the old analytics on an iPhone

What happened when you attempted to access the old analytics on an iPhone.

When we decided to redesign and update our analytics, one of the first things we looked at was updating our charts. Our old charts were Flash-based and as such didn’t work on iPhones or iPads and tended to load a little slow. We wanted to fix that with this release.

I started by looking a wide range of options. The number of options for charts is staggering. In the end, we decided to implement a mixture of Flot for the daily line charts you see at the top of most pages, CSS for our bar charts, and Google Charts for our maps. Let’s see how we specifically implemented each of these.
Continue reading

Site Maintenance Tomorrow Morning

All,

We will be performing routine maintenance to the site tomorrow morning, 08/31/2010 @ 4:00am EST. The maintenance is expected to last about two hours. During this period, there will be no access to the Forum nor will modifications to email templates be available.

Please note that this maintenance will in no way affect sharing, analytics or overall button performance.

As always, thank you for using AddThis.

The AddThis Team

Best Practices Guide for OExchange Targets

OExchange has a new Best Practices Guide for service providers, it has some suggestions for avoiding common UX and functional bugs we’ve seen on sharing sites over the years:

  • Welcome new users on your Offer page
  • Preserve URL parameters through the login process
  • Check your URL encoding and decoding
  • Don’t resize the browser window

These are great tips for any sharing site, they’ll improve the user experience and make it easier to share.  The guide also shows you how to you test your site using the OExchange tools, check it out:

http://www.oexchange.org/tools/bestpractices/

We’re going to roll out improved Service Directory support for OExchange-compatible services in the next month or so, stay tuned!