All posts by Matt Abrams

How We Count Our Users

We want our count of unique users that the AddThis network sees per month to be one our customers can believe in.  This post explains our methodology for generating that number.

There are 2 numbers to consider:

  • Raw uniques – The raw number is really a count of all cookies, including bots and everything.
  • Good UID – We use a “good UID,” a number that we know is lower than our actual reach, but better to be conservative.

Here’s how we get to our “good” UID:

  • Start with time component:  We start with every cookie that has been seen at least twice over a period greater than 24 hours apart because we need to be sure that we do not include bots and consumers who constantly clear their caches.  The time component is crucial to an accurate count because there are many cases where transient consumers appear in the log files multiple times in a narrow time period but then disapear from the logs a short while later.  A filter that counts any consumer seen multiple times would count this activity incorrectly and cause the unique user count to be inflated.
  • Count Unique Monthly Users: Then we take a de-duplicated count across all of the days in the month we are measuring to get the total count.  As we have discussed in another blog post, this is not a trivial task and it is tempting to take the easy way out and simply sum the total unique users by day in order to get the final unique user count.  Doing so would overestimate the actual number of unique users counted.
  • Consider multiple devices: Unfortunately there is no reasonable way to detect when the same person uses multiple devices to access the internet, this is very common in the US and other westernized countries.  For this reason, we take the floor of our total unique user count to reduce the impact of overcounting multiple devices.

Even with our conservative counting approach, we’re proud to be on sites that reach 1.3B users on a monthly basis, making us the largest social sharing platform on the open web. We’d love your feedback and would be happy to expand on any aspect of our methodology.

Probabilistic Counting

At Clearspring we like to count things. Counting the number of distinct elements (the cardinality) of a set is challenge when the cardinality is large.

To better understand the challenge of determining the cardinality of large sets let’s imagine that you have a 16 character ID and you’d like to count the number of distinct IDs that you’ve seen in your logs. Here is an example:


These 16 characters represent 128 bits. 65k IDs would require 1 megabyte of space. We receive over 3 billion events per day and each event has an ID. Those IDs require 384,000,000,000 bytes or 45 gigabytes of storage. And that is just the space the ID field requires! To get the cardinality of IDs in our daily events we could take a simplistic approach. The most straightforward idea is to use an in memory hash set that contains the unique list of IDs seen in the input files. Even if we assume only 1 in 3 records is unique the hash set would still take 119 gigs of RAM not including the overhead Java requires to store objects in memory. You would need a machine with several hundred gigs of memory to count distinct elements this way and that is only to count a single day’s unique IDs. We certainly don’t have a single machine with several hundred gigs of free memory sitting around so we need a better solution.

One common approach to this problem is the use of bitmaps. Bitmaps can be used to quickly and accurately get the cardinality of a given input. While bitmaps drastically reduce the space requirements from the naive set implementation described above they are still problematic when the cardinality is very high and/or you have a very large number of different sets to count.  If we want to count to one hundred million using a bitmap you will need one hundred million bits or roughly 12 megabytes for each counter.  Sparse bitmaps can be compressed in order to gain space efficiency but our bitmaps are not sparse. It is also not uncommon for our jobs to contain hundreds or even thousands of counters so we require a more space efficient solution.

Luckily for us cardinality estimation is a popular area of research. We’ve leveraged this research and implemented, and open sourced, several distributed probabilistic cardinality estimation algorithms. We will not go into the details of how these algorithms work in this post but if you are interested you can find references to the relevant research papers in our GitHub project.  In general these algorithms provide a space efficient mechanism for estimating the cardinality of a set but with less than perfect accuracy. They do this by representing the count as a set of bit fields and use some hashing function to determine the relevant bucket and bit for the element being added to the set. One algorithm, the LogLog cardinality estimation, is amazingly space efficient. Using LogLog you can count up to 100 million distinct elements with 4% error using just 640 bytes.

So now that we can count the number of distinct elements in huge sets using probabilistic cardinality estimators we have to solve for another problem. Clearspring has close to 2 petabytes of storage spread across several hundred servers. We store partitions of our data on each server so in order to get the cardinality of the full data set we need to combine the results from each of the servers. Let’s look at an example.  Imagine that we have 3 servers and the table below represents the cardinality for the data on each of those servers:



1 1000
2 1500
3 2500

We cannot simply add up the cardinality from each server and say that our total cardinality is 5000. That would assume that the data is partitioned in such a way that guarantees that no ID appears on more than one machine. If that condition is not true, as is often the case, we would be over estimating the cardinality by summing the results. This is where the true power of cardinality estimators really shines through. In addition to being space efficient, the cardinality estimators we use are also mergeable. This means that we can take the cardinality estimate, which is represented as an array of bits, from multiple machines and merge them together to get a cardinality estimate for the global set.

If estimating the cardinality of large sets is a problem you have then we recommend using probabilistic counters and if you happen to use Java then our stream-lib project is great place to start.

Want to work on problems like these?  We are hiring.

Clearspring’s Big Data Architecture, Part 3

Querying Data

This is Part 3 of a four part (Part 1Part 2) series of blog posts discussing Clearspring’s big data architecture.  This post will describe the distributed query system we use to quickly access terabytes of data that is distributed across hundreds of machines. The query subsystem is comprised of two key components, QueryMaster and QuerySlave.  A single cluster can have multiple QueryMasters and each processing node in the cluster will have one or more QuerySlaves.

The QueryMaster accepts a query, identifies which job the query is for, sends the query to the QuerySlaves that have data for the specified job, and then aggregates the responses from each slave into a single result set that is provided back to the client.

QuerySlaves and the QueryMaster communicate via ZooKeeper and RabbitMQ.  It is important to store the mapping of jobs to nodes in a distributed configuration management system like ZooKeeper because the system is not static.  Data in the cluster can move from one node to another when the system re-balances data or when a node fails.  This dynamic mapping allows the QueryMaster to only query the nodes we know have data and dynamically change the set of nodes queried whenever the location of data in the cluster changes.

The query is broken down into local and remote operations.  Operations are things like aggregate, sort, pivot, multiply, filter, concatenate, and so on.  Local operations are performed by the QueryMaster and remote operations are performed by the QuerySlaves.  Remote operations allow the system to take advantage of the system resources available in the cluster.  Having each node sorting a subset of the data is much more efficient than having the QueryMaster sort the full set of unsorted data.  Remote operations can also significantly reduce the amount of data transferred between the QuerySlave and QueryMaster.

Query Example

The best way to understand how the query system works is to look at an example.  We have developed a query language specific to querying data from trees.  Imagine that we have a tree that has the path:  ymd/{date}/{type}/{url}.  Where the values inside of the brackets are data elements that are populated with the input values that are added to the tree and ‘ymd’ is a constant the represents the root of the tree.  The following example will query data from this tree for the dates 110322 and 110323 for any type.

The results for this query would look like:

So this query returns the number of times we saw a URL on a certain day.  The ‘+:+count’ fragment at the end of the query instructs the query system to return the meta-data ‘count’ for that node in the tree. Count is a data attachment that is automatically attached to every node in the tree and simply counts the number of times a specific path through the tree has been followed. If we replace ‘+110322,110323’ with ‘110322,110323’ it will remove the dates from the result set but it will still use them to filter the data.  The results of the modified query would look like:

The QueryMaster also provides a query caching infrastructure based on the structure of the query.  This enables the QueryMaster to return data quickly in cases where the same query is executed multiple times without querying the slaves in the cluster.


Query Operations

Query operations allow a query designer to transform the results of a query to meet their needs.  Here are a few common operations available in the system.

Each of these operations can be executed on the QueryMaster and/or the QuerySlave. As an example suppose we wanted to count the number of times a URL was shared as we did in the example above. In this case we could use the following operations:

If the raw data on a data node looked like:

Then a QuerySlave, using the operation ‘gather=ks’, would return:

The QueryMaster will receive result sets like the one above from each QuerySlave in the cluster and then perform the gather operation on all of those results.  Finally the QueryMaster will sort the results in descending order using the count column as the sort key.



This post has provided a high-level overview of the distributed query engine that provides fast and flexible access to the data stored in our clusters.  The final post in the series will focus on the command and control subsystem that orchestrates job execution and resource management in our clusters.  If you’re interested in learning more about Hydra and other big data architectures please join us at the next Big Data DC monthly meetup.


Clearspring’s Big Data Architecture, Part 2

This is part two of a four part series on Clearspring’s approach to big data analytics. (See part 1.)  This post will focus on building tree-based data structures to store the massive quantity of information that Clearspring receives each day. Trees have several useful characteristics that make them the ideal data structure for our needs. For example, trees provide a compressed representation, an index, of our data where a path through a tree represents a row of data. Trees can also be sharded and distributed throughout our clusters which allows us to create massive trees composed of hundreds of different shards. At query time we can easily gather and merge information from each shard.


Tree Building Process

The foundation of our distributed analytics framework is the tree building process. We selected a tree-based storage structure for our processed data because we feel that it fills a useful middle ground between the datamodel-centric view of row oriented databases and the query-centric view more common with column-oriented databases.

The TreeBuilder consumes data from one or more data sources (typically a set of Stream Servers) and then builds a tree based on the input data and a job configuration that defines the desired tree structure. An example tree might look like:

One of the more interesting aspects of our tree data structure is the concept of data attachments. We often run into situations where we’d like to store certain bits of information in a way that is easily retrievable. Some nodes in a tree may have many millions of children so questions like “which URLs were clicked the most times?”, can be very expensive. This is where the data attachment comes into play. We can use operations from our recently open sourced stream-lib project to find and store the “top” URLs as a data attachment on the parent node. This top element is dynamically updated anytime new data is added to the tree. As each new record is processed the data attachment is updated to reflect the current list of n top elements. Tops may be lossy but if the top is large enough the results are generally accurate enough for our needs. Stream-lib is a very powerful library that makes it possible to count a huge number of elements in a distributed system, find the top-k elements from massive lists of data, and perform membership detection using bloom filters. You can read more about stream-lib here or checkout the source code on github.

Let’s take a look at an example JSON configuration used to create a tree:

In the example above we create a simple tree with the root “urltree”. Underneath “urltree” we have two branches,“ymd” and “ym”, representing data broken down by day and by month. Each of those branches have two levels of children. The first level contains a list of dates and the second level has a list of URLs. The first level of both branches has one data attachment called “topurls”. This data attachment will contain a list of the URLs with the highest cardinality. So if we want to know which URL was seen the most times in the month of May or on a specific day in May that data can be retrieved quickly without requiring iteration over all of the children of the date node. The resulting tree with sample data may look like:

We use Berkely DB JE as our underlying database for each job. Berkley DB JE is a powerful database and provides a nice abstraction layer to the data structure we wanted to use. It is very common for us to have jobs that store more data than we can practically fit into memory. We created a library called PageDB that provides an abstraction layer between Berkley DB JE and the rest of our system. PageDB tracks and estimates memory consumption for each database that is paged into memory. It groups keys and their values together for efficient block level compression which reduces the disk IO required to store and retrieve data. Rather than storing first class Java objects in memory, PageDB keeps the data in compressed byte format which is significantly more efficient than storing the first class Java objects in the heap.


Whats Next?

Parts one and two of this series introduced Hydra, Stream Servers, and the tree-building process. These components provide the foundation necessary to discuss the distributed query engine we use to retrieve information from our clusters and that will be the topic of the next post in this series. If you’d like to work on problems like these we are always hiring.

Clearspring’s Big Data Architecture, Part 1

This is the first post in a three part series that will describe the data analytics architecture Clearspring has created to process large amounts of semi-structured data.  Check out post 2 and post 3 when you’re done reading this post.

On a daily basis, the Clearspring network yields tens of billions of new, unique data points. In storage terms, we ingest 4 to 5 TB of new data each day; that could easily double in 12-18 months. That data must be processed as live streams, re-processed from the archive as new algorithms are developed and able to be queried in flexible ways. An interesting mix of batch, live and hybrid jobs are employed. To keep pace with a rapidly expanding network, the architecture must not only efficiently handle the current set of tasks, but it must also enable significant growth while guarding against hardware and software faults. To satisfy these needs, we built an entire software suite that leverages open source technologies and many in-house software components.


Desired System Characteristics and Challenges

  1. Efficient and reliable storage of large amounts of data on unreliable machines in a distributed system with fast writes, data partitioning and replication, and data streaming capabilities.
  2. Knowledge discovery.  Our system consumes a large amount of raw data that in and of itself is not very interesting so we need to be able to extract the interesting information by analyzing large streams of data from multiple sources.
  3. Management of distributed resources.  We wanted the system to distribute jobs in the cluster, determine when it is safe to execute the job, report failures, and provide a nice GUI to visualize the current state of the cluster.
  4. Speed.  The small things add up when you do them over a billion times.  For example, if we have 2.5 billion messages and we shave 1 millisecond (.001 seconds) off the time it takes to process a single event we will have saved nearly 29 days of computation time.  In our case, milliseconds matter!

There are many systems available that provide some of the characteristics listed above, including  Enterprise SANsHDFSOpen Stack (Swift),CouchDBRedisElephantDBVoldemortHadoopCassandra, and others.   While we do use some of these projects for various components of our architecture, as an example we use Cassandra for our distributed counting system that keeps track of how many times a URL is shared and serves over 200M view requests daily, the primary system we use for our distributed processing needs is a project we call Hydra.



The Hydra system we created internally has four main functional areas:
  1. Capability to partition, store, and stream data in a distributed system.
  2. Software that takes an input data stream, processes that data (filtering, counting, membership detection, etc) and then stores the batch output in Berkley DB JE which uses sequential writes and indexes the batch output for fast reads.
  3. A set of software modules that enable ad-hoc queries against tree-based data structures.
  4. Command and control subsystem responsible for monitoring and executing jobs in the cluster.


Streaming and Splitting Data

We have too much raw data to store on a single machine and we were not interested in purchasing an enterprise SAN capable of storing petabytes of data.  While we looked at several distributed file systems like HDFS, GlusterFS, and others we ended up going with a standard ext3 filesystem and implemented our own distribution and replication framework on top of that standard file system. There are two standard modules, Splitters and Stream Servers.


The Splitter consumes an input stream from a data source.  The data source could be a flat file on the local system, an input stream sourced by a Stream Server, a firehose of data exposed via an API, or any other source you can think of.  The format of the source data from the stream is described to the splitter (in the near future we will be using Zookeeper to store stream formatting information).  Each splitter consumes a sub-set of the data based on a hash function. The Splitter then uses the stream format information and the job configuration script (usually a JSON file) to filter, transform, and create data elements which will be stored in n partitions on the local file system.  After a checkpoint is reached (all available input data has been consumed or some time limit has been reached), the files the Splitter stores are made available to other processes in the cluster via the Stream Servers.

Splitter processes are responsible for ensuring that they do not reprocess data that has already been consumed.  They do this by storing meta data in their local job directory that indicates which files they’ve consumed and an offset indicating how far into those files the process has read.  Because we only append to log files and never modify existing content the splitter process doesn’t need to re-read data it has already processed when more data is appended to a file.  We feel having each client track its own state is a better approach than having the StreamServer attempt to track the current state of every consumer in the cluster.

This simple setup can accomplish some very powerful things beyond the obvious capability of storing and partitioning raw data to a local node in the cluster.  One useful function the Splitter provides is recombining many small files into a smaller number of time sorted files.  This can be accomplished by having n splitter nodes consume from m sources where n < m.  All of the data from the data sources will be consumed but they will be persisted into a smaller number of files than originally required.

Stream Server

Stream Servers provide access to the files stored on one node to any other node in the cluster. The Stream Server transmits raw bytes to clients because they may be in a compressed format. This limits the the number of times the data in the file is decoded and  reduces the total amount of data transmitted over the network. This approach does prevent the Stream Server from applying fine grained filters to the individual log lines. However we think that this limitation is acceptable given CPU cycles saved by only decoding the file once on the client side and the massive reduction in network IO.

Requests for data come into the Stream Server in the form of regular expressions which describe which files the client would like to receive.  Here is an example of what a request might look like:



The snippet above would tell the Stream Server to find all files in any directory under ‘job/1/*/live/split/us’ that has a file that matches the regular expression defined in the match field.  The date format will be replaced at run-time with the set of dates the client is interested in and the mod variable will be replaced with the modulus for the client.  This approach enables a client to stream data for a specific partition and time range without requiring complex filtering on the server side.



This was part one in a series of posts where we will explain how Clearspring processes and stores multiple terabytes of data daily.  In upcoming articles we will explore how and why we use a tree based data structure, our distributed query system, and the command and control infrastructure that binds it all together.  Check out post 2 and post 3 to learn more.  Subscribe to our RSS feed or follow us on twitter to stay informed.