Getting Started With Hydra

Now that Hydra is open source we can start to talk about how to use it for common data processing tasks.  In this post we will answer several questions about log files generated using Log-Synth.  The Log-Synth files we will be using for this post have four columns:


And the sample data looks like this:


Our goal is to answer the following questions:

  1. What are the top IP addresses by request count?

  2. What are the top IP addresses by unique user?

  3. What are the most common search terms?

  4. What are the most common search terms in the slowest 5% of the queries?

  5. What is the daily number of searches, (approximate) number of unique users, (approximate) number of unique IP addresses and distribution of response times (average, min, max, 25, 50 and 75%-iles).

For ease of development and testing the full Hydra stack can be run on a single machine.  If you haven’t already cloned the Hydra repo and setup your own local stack you can do so by following the README instructions here.  It is very important to build Hydra with the -Pbdbje flag to ensure BDB is included in your build.

In our scenario we will be processing 1 million sample log lines.  Obviously with a input data set this small we would not recommend using Hydra or any other distributed system but we are just trying to demonstrate the capability here.

The first step is to copy the sample data generated using Log-Synth into a directory that your Hydra LocalStack has visibility to.   Hydra LocalStack’s base directory for serving data is ~/hydra/hydra-local/streams.  We will put our sample data in ~/hydra/hydra-local/streams/log-synth.  In order to demonstrate Hydra’s ability to process data in concurrent streams and then join that data to get de-duplicated unique counts we will split the 1M records into 10 files.  There is no partitioning algorithm used so it is possible for the UIDs and IPs to be repeated across multiple files.  In the end our streams directory looks like:


So now our goal is to build a Hydra job that will answer the questions we posed at the top of the post.

The Source, Map, and Output are the three sections that nearly every Hydra job contains.


The Source section tells Hydra where to find the input data for a given job.  The most common type of source is a Mesh source that is able to stream data from any node in the cluster that is peered together.  In our sample case we will be streaming data from our localhost but the source configuration defined here is the same for a single node cluster or a large distributed cluster.


Let’s walk through this Source definition to explain the configuration.

Setting hash to true tells Hydra to apply a hashing function to each file name.  Hydra will use this hash value modulo the number of processing tasks to determine which tasks should process which files.   This ensures that each unique input file is processed by one and only one task processor.

One line 5 we define the mesh.  In this case we are using all defaults and simply telling our source what to use for the match string to identify files that we wish to process.  This section is very powerful but in our simple case nothing fancy is needed.

The format component of the source definition starts on line 7.  We are using a simple CSV file for input we need to describe to the input file to the source.  When using a self describing binary codec this description is not required.  In our case we assign names to the columns and then we indicate that anything surrounded by a double quote should be grouped together into a single value.


The Map section is where we tell the Hydra job what fields to process and how to manipulate those fields.  Hydra comes with a large number of BundleFilters and ValueFilters that can be used to manipulate the values of a given record.

The Map section has three optional subsections.  FilterIn, Fields, and FilterOut. In this case we only define the FilterOut section which applies bundle and value filters to the contents of a row before emitting them to the output sink.


Note that filters are composable.  In this example we use a BundleFilterChain which invokes an array of BundleFilters in series.  Each filter will be applied and then call the following filter.  It is possible to break the chain if a filter returns false.  In that case the record the caused the filter to return false will be omitted from the output stream.  If we want to insure that all filters are applied we can set the parameter failStop to false on the BundleFilterChain.

First we apply the trim filter to the UID and IP address fields to ensure that they have no leading or trailing spaces. Next we apply the num filter which is a calculator that supports reverse polish notation (RPN).  In this case the QUERY_TIME field is being multiplied by 1000 and then that value is inserted into a new field QUERY_TIME_MOD.  This is done because we want to track the percentile distributions of query times and the data structure we use for this does not support floating point values.

The final filter is the debug filter.  This is a handy utility that prints out the first n records to the debug log files in text form.  This makes it easy for a Hydra Job developer to see the values of a record without doing a lot of sleuthing.  Note that the debug filter is the last filter applied in the chain.  This means the output record (and the debug statements) will contain all of the manipulations performed by the previous filters.


The last component of the job is the Output.  In this example we will be using a Tree output type.  Recall the questions we wanted to answer about this data set.   What are the top search terms?  What are the top IP addresses by record count and unique users?  When building a Hydra tree we use the questions we are trying to answer to drive the structure of the tree.   We could just have one path through our tree that looks like

  • root
    • ${time}
    • ${ip}
    • ${uid}
    • ${terms}

Where the variables, ${var}, are replaced with the contents of the record value.  While functional this tree structure would require expensive queries that can be optimized by adding more paths through the tree.  This is similar to adding indexes to a relational table.  Just like relational databases with index values we can choose to have more or less paths through or tree to make the queries we run more or less efficient.  Our optimized tree will look like:

  • root
    • all
      • ${terms}
    • byip
      • ${ip}
      • ${terms}
    • bytime
      • ${time}
      • ${terms}

Adding data attachments is the last optimization we’ll add to the tree.  Data attachments are data structures that we can attach directly to a node in the tree.  There is a long list of available data attachments in hydra.   The advantage of using these structures is you can access meta-information about the node or its children without having to traverse the full tree.   For example we can keep track of the top-n child nodes by record count in a key topper.  The key topper uses a streaming summarization algorithm that can keep track of the top-k elements in the stream efficiently.  The final tree looks like:

  • root - [top-terms, top-uids, top-ips, count-unique-uids,count-unique-ips, percentile-distribution-time]
    • all
      • ${terms}
    • byip - [count-unique-uids]
      • ${ip}
      • ${terms}
    • bytime
      • ${time}
      • ${terms}

We will show how these data attachments are used when we run queries against the processed data.


Here is a gist of the full job.

Now that we’ve built our job we can run it through Hydra’s Spawn UI.   Spawn’s URL is http://localhost:5052/ on your local machine (if local-stack is running).  We create the job with 4 nodes (this means four processing tasks will consume the input data).

After the job has run it is time to query the data and answer the original set of questions.

What are the top IP address by request count?

This is a basic question and easy to answer with a Hydra query.  Recall that we have a ‘byip’ branch in our tree.  Every hydra node keeps track of how many times that path has been traversed.  We call this the hit count.  We formulate our query by following the desired path through our tree.  The query path looks like:


The +:+hits element may look a little funny but it is actually very simple.  The + sign indicates we should collect the value of that element in our query results.  The colon is used to access the meta data on the node.  So :+hits gives us the number of times that path has been traversed.

There is one complication we need to consider.  Our job ran on four separate processing nodes so we will need to join the data from those nodes into a common result set.  The data was split randomly amongst the files so it is possible that the sample IP address appears on more than one of the processing nodes.  Imagine the results for the individual nodes for a single IP look like:

  • Node 1: -> 100
  • Node 2: -> 25
  • Node 3: -> 50
  • Node 4: -> 25

The desired end result is that IP seen 200 times, not the same IP repeated four times with different values.  To achieve the desired result we use the query operations gather.  This operation merges rows based on some instruction set.  We have two columns (two + collectors) in the result set so gather=ks will merge all rows that have the same value (key) in the first column and then sum the values in the second column.

We want the top IPs so we will tell Hydra sort the results by the second column in descending order.  sort=1:n:d accomplishes this.  The n in the middle tells Hydra to treat the column as a number as opposed to the default String sorting.

Finally we want to limit the total result set to the top 100 results.  We use limit=100 for this purpose.  It is important to note that we apply the limit after sorting the data set.  If you apply the limit first then the results will not be the desired top results.  Here is a screenshot of our query and sample results:

What are the top IP address by unique users?

This question is a little more interesting.  Rather than simply providing the list of IPs that had the most records we want the list of IPs that had the most uniques.  This is where the power of data attachments becomes apparent.  We include a unique count data attachment (which is a HyperLogLogPlus data structure in reality)  on each IP data node.  In this case the HLL may be a bit of overkill because each IP is unlikely to have a huge number of UIDs associated with it but it is good for demonstration purposes.

The good news is that our query to answer this question is almost exactly the same as the previous query except in this instance we will get the count from the unique uid data attachment rather than the raw number of records.


Basically :+hits is replaced with $+uuids where the $ is used to access the data attachment and uuids is the arbitrary name for the unique UID counter we added to our tree.  The results look like:

What are the most common search terms?

We will show how to answer this question using two different approaches.

The first uses the top terms data attachment we placed on the root node of the tree.  The query looks like:


Key Toppers create virtual nodes that you can query against as if they were real nodes in the tree.  So even though the attachment is on the root node we query it one level below that.  The thing to keep in mind here is that this provides an approximate results.  Note that we specified the size of this topper to be 1000 elements.  So even though there could be millions or billions of search terms present in the data set only the top 1000 will be in the key topper. Here is a screenshot of the query:

The second approach provides an exact answer but at the cost of navigating each search term node in the tree individually.  The result is more accurate than the the probabilistic approach but it comes at a much higher cost.  The query:


Here we are collecting the search term node and the number of times that node has been traversed.  We need to do the same merge we have done in previous queries but now the number of records is much larger.  So rather than using gather we will use the merge which joins adjacent records.  To ensure we all records with the same key are joined we must first sort the data to align the keys.  The desired operations are:


What are the most common search terms in the slowest 5% of the queries?

This is the same question we asked above but now we want to limit the results to only the queries that were in slowest 5% based on their query time.  So the first we must determine the threshold for the 95th percentile of the slowest queries.  To answer question recall that we added the distribution data attachment to the root node in our tree.  We can query this attachment to determine the 95th percentile.  The query path is straight forward:


Some math is required to get the output in the desired format.  We will use the RPN calculator to help here.  When the query time values were placed in the attachment we multiplied them by 1000 to turn them from floating points to longs.  Now we need to reverse that.  The operation to do so is as follows:


Think of the RPN as a stack. We push the value from column 0 (c0) and the value 1000 onto the stack.  Then we apply the ddiv operation which is a divide for floating point operations onto the stack.  The result of this operation is popped onto the stack and then we push the value 0 onto the stack.  Finally the set operation sets the value in column 0 (determined by popping the stack) to the next value popped from the stack which was the result of the ddiv operation.

We still aren’t done because now we have 4 values (remember we have four processing nodes and each node has its own value).  So we’ll take the average of those four 95th percentile values.  We can use the gather operation for this purpose:


Here is the end result:

Now that we have determined the 95th percentile query time we want to find the top query terms in that had that time or higher.  We’ll use the bytime branch for this query:


The operations will be used to filter results that are less than the 95th percentile from our results. Note that in a real production job we would likely store the queries bucketed into time intervals to prevent a full scan of all records.  So first up is dropping all records that are less than our 95th percentile.  Once again RPN calculator is used for this.


Now that we’ve dropped records less than our 95th percentile we have no more need for the time column.  So we can drop that using the i command for the gather op.  The other two columns (terms and hits) will be merged using the term as the key and then sorted by the hit count descending.


The final query looks like:

What is the daily number of searches, (approximate) number of unique users, (approximate) number of unique IP address and distribution of response times

These questions are all easy to answer in Hydra.  For the total number of searches we’ll just assume that our input data in this case is a full day of searches.  So all we need to do is look at the hit count on the root node:

Next up is the approximate number of unique IPs and UIDs.  We’ll answer both of these with one query:

Recall that these unique counters are backed by HyperLogLogPlus data structures.  So even though the results are coming in from for different data sets Hydra is smart enough to know that these data structures are HLLs and the sum operation, gather=ss, does not do a simple sum.  Instead it merges the HLLs to get a globally unique count of UIDs and IP addresses.

The final question we want to answer is the distribution of response times.  We’ve seen a preview of this query when we retrieved the 95th percentile response times.  The distribution data attachment is used once again.  But this time we’ll ask it to return several values rather than just the 95th percentile.

Hopefully this article will help you get started with open source Hydra.  If you have any questions you can find us in our IRC chatroom #hydra or on our mailing list