This is the first post in a three part series that will describe the data analytics architecture Clearspring has created to process large amounts of semi-structured data. Check out post 2 and post 3 when you’re done reading this post.
On a daily basis, the Clearspring network yields tens of billions of new, unique data points. In storage terms, we ingest 4 to 5 TB of new data each day; that could easily double in 12-18 months. That data must be processed as live streams, re-processed from the archive as new algorithms are developed and able to be queried in flexible ways. An interesting mix of batch, live and hybrid jobs are employed. To keep pace with a rapidly expanding network, the architecture must not only efficiently handle the current set of tasks, but it must also enable significant growth while guarding against hardware and software faults. To satisfy these needs, we built an entire software suite that leverages open source technologies and many in-house software components.
Desired System Characteristics and Challenges
- Efficient and reliable storage of large amounts of data on unreliable machines in a distributed system with fast writes, data partitioning and replication, and data streaming capabilities.
- Knowledge discovery. Our system consumes a large amount of raw data that in and of itself is not very interesting so we need to be able to extract the interesting information by analyzing large streams of data from multiple sources.
- Management of distributed resources. We wanted the system to distribute jobs in the cluster, determine when it is safe to execute the job, report failures, and provide a nice GUI to visualize the current state of the cluster.
- Speed. The small things add up when you do them over a billion times. For example, if we have 2.5 billion messages and we shave 1 millisecond (.001 seconds) off the time it takes to process a single event we will have saved nearly 29 days of computation time. In our case, milliseconds matter!
There are many systems available that provide some of the characteristics listed above, including Enterprise SANs, HDFS, Open Stack (Swift),CouchDB, Redis, ElephantDB, Voldemort, Hadoop, Cassandra, and others. While we do use some of these projects for various components of our architecture, as an example we use Cassandra for our distributed counting system that keeps track of how many times a URL is shared and serves over 200M view requests daily, the primary system we use for our distributed processing needs is a project we call Hydra.
The Hydra system we created internally has four main functional areas:
- Capability to partition, store, and stream data in a distributed system.
- Software that takes an input data stream, processes that data (filtering, counting, membership detection, etc) and then stores the batch output in Berkley DB JE which uses sequential writes and indexes the batch output for fast reads.
- A set of software modules that enable ad-hoc queries against tree-based data structures.
- Command and control subsystem responsible for monitoring and executing jobs in the cluster.
Streaming and Splitting Data
We have too much raw data to store on a single machine and we were not interested in purchasing an enterprise SAN capable of storing petabytes of data. While we looked at several distributed file systems like HDFS, GlusterFS, and others we ended up going with a standard ext3 filesystem and implemented our own distribution and replication framework on top of that standard file system. There are two standard modules, Splitters and Stream Servers.
The Splitter consumes an input stream from a data source. The data source could be a flat file on the local system, an input stream sourced by a Stream Server, a firehose of data exposed via an API, or any other source you can think of. The format of the source data from the stream is described to the splitter (in the near future we will be using Zookeeper to store stream formatting information). Each splitter consumes a sub-set of the data based on a hash function. The Splitter then uses the stream format information and the job configuration script (usually a JSON file) to filter, transform, and create data elements which will be stored in n partitions on the local file system. After a checkpoint is reached (all available input data has been consumed or some time limit has been reached), the files the Splitter stores are made available to other processes in the cluster via the Stream Servers.
Splitter processes are responsible for ensuring that they do not reprocess data that has already been consumed. They do this by storing meta data in their local job directory that indicates which files they’ve consumed and an offset indicating how far into those files the process has read. Because we only append to log files and never modify existing content the splitter process doesn’t need to re-read data it has already processed when more data is appended to a file. We feel having each client track its own state is a better approach than having the StreamServer attempt to track the current state of every consumer in the cluster.
This simple setup can accomplish some very powerful things beyond the obvious capability of storing and partitioning raw data to a local node in the cluster. One useful function the Splitter provides is recombining many small files into a smaller number of time sorted files. This can be accomplished by having n splitter nodes consume from m sources where n < m. All of the data from the data sources will be consumed but they will be persisted into a smaller number of files than originally required.
Stream Servers provide access to the files stored on one node to any other node in the cluster. The Stream Server transmits raw bytes to clients because they may be in a compressed format. This limits the the number of times the data in the file is decoded and reduces the total amount of data transmitted over the network. This approach does prevent the Stream Server from applying fine grained filters to the individual log lines. However we think that this limitation is acceptable given CPU cycles saved by only decoding the file once on the client side and the massive reduction in network IO.
Requests for data come into the Stream Server in the form of regular expressions which describe which files the client would like to receive. Here is an example of what a request might look like:
The snippet above would tell the Stream Server to find all files in any directory under ‘job/1/*/live/split/us’ that has a file that matches the regular expression defined in the match field. The date format will be replaced at run-time with the set of dates the client is interested in and the mod variable will be replaced with the modulus for the client. This approach enables a client to stream data for a specific partition and time range without requiring complex filtering on the server side.
This was part one in a series of posts where we will explain how Clearspring processes and stores multiple terabytes of data daily. In upcoming articles we will explore how and why we use a tree based data structure, our distributed query system, and the command and control infrastructure that binds it all together. Check out post 2
and post 3
to learn more. Subscribe to our RSS feed
or follow us
on twitter to stay informed.