Blog

Hydra: Storage of Nested Hierarchical Data

Hydra is a distributed data processing and storage system developed at AddThis, which we recently released as open source. It ingests streams of data and builds hierarchical tree structures that are aggregates, summaries, or transformations of the data. Sibling nodes in the tree are stored in lexicographic sorted order. This ordering is often used explicitly by the human when writing queries or implicitly by the query system to optimize the execution of queries.

The trees that are generated by Hydra are large, typically on the order of millions or billions of nodes. It is natural to store the tree in a KV storage system that is persistent to disk. In practice we have observed a significant decrease in processing throughput when we interact directly with the external storage system. The individual nodes of the tree can be relatively small in size. The overhead of serializing and deserializing these objects can be high. Instead of writing nodes directly to disk we aggregate a collection of (key, node) pairs into pages of objects and use the external storage to operate these aggregate pages.

Tree

To implement this paged, ordered collection we use a three-tiered hierarchy. The node cache stores (key, node) pairs. A node may have zero or more active leases held. A node may be safely evicted from the node cache when it has a lease count of zero. The page cache receives nodes from the node cache and stores the node into its correct page. The page cache stores the recently accessed pages from the external DB. The external storage maintains a collection of (key, page) pairs. The keys stored on a page are contiguous with respect to the sort order of the key. The external storage stores pages in a serialized representation.

The node cache is a concurrent cache that stores (key, node) pairs. There are two caveats to this definition and they both concern the eviction of nodes: (a) only nodes with a lease count of zero may be evicted, and (b) a node must be written to the page cache when it is evicted. We implemented these requirements with a fork of the ConcurrentLinkedHashMap project. In our MediatedEvictionConcurrentHashMap the cache may be provided a boolean onEviction(key, value) method. This method allows the cache to selectively prevent entries from eviction from the cache. The onEviction() method may also perform a side-effect when an element is evicted such as writing the new value to the page cache.

The page cache is a concurrent ordered map of (key, page) pairs. When a requested page is not found in the page cache then it is loaded from the external storage. A page has a maximum capacity which can be specified either as the number of pairs per page or with a memory estimate on the pairs stored on the page. Values are deserialized from their byte array representation on-demand as they are requested from the node cache.

This abstraction layer known as the page cache is not to be confused with the operating system page cache. The purpose of our page cache is twofold: (1) it acts as a mediator between (key, node) pairs to (key, page) page and (2) it ensures that inactive nodes are serialized to disk. Most KV store implementations assume that keys and values are immutable objects. This assumption is upheld by writing a node from the node cache to the page cache when the node has no active leases. If a node lives in both the node cache and the page cache then its copy in the node cache is considered the “correct” version.

Because the page cache translates serialized representations to their Java representations, there is no concept of a “live” object in the page cache. The value on a page may be serialized and deserialized into multiple instances on the JVM heap. An object is considered “alive” only if it is present in the node cache and this also why an expired value from the node cache must be written into the page cache. It is possible for the page cache to have deserialized a stale copy of the value in the interim.

A skiplist is used by the page cache to maintain a concurrent ordered map of pages.  Two invariants are maintained by the page cache: (a) when two or more pages must be locked by an operation then the lower page must be locked prior to the higher page. (b) Before loading a page from the external storage the highest key in the page cache that is less than the new page must be locked. Eviction is performed asynchronously with background threads. An implementation of the clock page replacement algorithm is used to determine which pages are evicted into the external storage. If a page exceeds its capacity then it must be split into two sibling pages. Each page has a reader/writer lock to maintain consistency.

The external DB can be any KV storage system that maintains an ordered mapping of keys to values that is persisted to disk. Keys and values at this layer of abstraction are represented in their serialized byte array representation. The goal is that any DB implementation can be replaced with another DB implementation. The Hydra project currently has support for the Berkeley DB Java Edition. In the future Hydra will have support for multiple DB implementations.

The three layers as described have been implemented using fine-grained locking. Each layer allows reader and writer threads to operate concurrently on disjoint sections of the data structures. This implementation supersedes an earlier implementation that used a single coarse-grained lock to maintain consistency at each layer. The implementations are the MediatedEvictionConcurrentHashMap, the SkipListCache, and the ConcurrentByteStoreBDB. We are in the process of phasing out the old implementation in favor of the new implementation.

Hydra is released under the Apache License Version 2.0. It is available on our GitHub page along with several other open-source projects. A mailing list is used for discussion together with the #hydra IRC channel on Freenode.