Blog

How to Join Data in Hydra

addthis-hydra

Hydra is great not only for continuously processing data streams, such as web logs, but also for tasks such as special data analysis, validation, troubleshooting, etc., that call for one-off jobs. Among the latter use cases, one of the more interesting and complicated cases is joining data sets. In this post, I’ll use an example to demonstrate how to join two data sets.

The Problem

There are two data streams, both keyed by UID (unique user ID), that we’d like to join. One contains user attributes:


TIME             UID      ATTRIBUTE_IDS
1419441590000    12345    1
1419441610000    67890    1,2
1419441610000    12345    3,4,5
1419441620000    11111    1,2
1419441630000    22222    3
...

 

Each line contains one UID and one or more Attribute IDs. The same UID may appear on multiple lines. The attribute IDs of the same UID on one line may or may not overlap with those on another line.

The other data stream is UID mapping to another ID space (PUID):


TIME             UID      PUID
1419441590000    12345    abc
1419441610000    67890    xyz
1419441710000    67890    zzz
...

 

Each line contains exactly one UID and one PUID. The same UID may appear on multiple lines, mapped to different PUIDs.

We want to join M days of user attribute data (tens of millions of records) with N days of UID mapping data (billions of records) to produce a data set that contains PUID and ATTRIBUTE_IDs:


PUID    ATTRIBUTE_IDS
abc     1,3,4,5
xyz     1,2
...

 

The Solution

The keys to the solution are:

  • Use an aggregate input source to consume both data sets.
  • Build a tree keyed by UID, and add attributes IDs and PUIDs under the UID.
  • Construct the query to return the data we want.

Now let’s examine each key point in detail.

Using an aggregate input source

An aggregate input source is a container TaskDataSource that encapsulates multiple input sources. When a job runs, the child sources are consumed in the order they are listed:


source:[
  {
    type:"mesh2",
    markDir:"attr-marks",
    mesh:{
      startDate:"141127",
      endDate:"141127",
      dateFormat:"YYMMdd",
      files:["/job/[ATTR_JOB_ID]/*/gold/split/{Y}{M}{D}/{{mod}}-*"],
    },
    shardTotal:128,
    injectSourceName:"ATTR",
  },
  {
    type:"mesh2",
    markDir:"mapping-marks",
    mesh:{
      startDate:"141029",
      endDate:"141127",
      dateFormat:"YYMMdd",
      files:["/job/[MAPPING_JOB_ID]/*/gold/split/{Y}{M}{D}/{{mod}}-*"],
    },
    shardTotal:128,
    injectSourceName:"MAPPING",
  }
]

 

In the snippet above:

  • [ATTR_JOB_ID] and [MAPPING_JOB_ID] should be the actual source job IDs.
  • The attribute data source is listed before the mapping data source, so it will be processed first. The reason for doing this will be explained later.
  • A different markDir should be used for each source — markDir is a directory that hydra uses to track what data has been processed for a source. It’s best not to reuse the same directory for different sources.
  • injectSourceName adds the specified field to every bundle, and sets the field value to the source data file path. It provides an easy way to identify a bundle’s data source.
  • Both data sources have the same number of shards.

The point about sharding deserves a detailed explanation. The snippet doesn’t show the full picture of how the source jobs shard their output data. Because we are processing data by UID, the natural choice for the shard key is UID––we want each task of this job to process a subset of UIDs.

Using any other shard key would make the job orders of magnitudes slower (in terms of processing and query performance). It is absolutely important that the source jobs use the same sharding algorithm. We typically hash the UID value then do a modulo n operation, where n is the number of output shards. In that case, the source jobs must use the same hashing function, and the same number n (hence the same shardTotal in the snippet). Only with all of the above true, is each task of this job guaranteed to consume the same subset of UIDs from both sources.

Building the Tree

For the sake of simplicity, we’ll assume the data doesn’t need any clean up. That means the map section of the job config is very simple:


map:{
  fields:[
    "TIME",
    "UID",
    "ATTRIBUTE_IDS",
    "PUID",
    // injected source names
    "ATTR",
    "MAPPING",
  ],
  filterOut:{op:"chain", filter:[
    // Split ATTRIBUTE_IDS to an array
    {from:"ATTRIBUTE_IDS", filter:{op:"split"}}
  ]},
},

 

Note that:
1) we must include the injected source name fields in fields, and
2) we split ATTRIBUTE_IDS, a comma delimited string, to an array (for expanding each element to a separate tree node later).

The real trick is in how we build the output tree: when we process the attribute data, we create new UID nodes as necessary; when we process the mapping data, however, we only update existing UID nodes, effectively throwing away UIDs not in the attribute data set. Because we are only interested in the overlap, keeping all UIDs from both data sets would add unnecessary lookup/indexing overhead for no benefit at all. The decision of which data set should be processed first, or from which data set should new UID nodes be created, is based on the size of the data set. Going back to the original problem statement, we can see that the attribute data set contains tens of millions of records while the mapping data set contains billions, therefore creating UID nodes based on the attribute data results in a much smaller tree than the alternative — a very important performance consideration.

Here’s the output part of the job config:


output:{
  type:"tree",
  root:[
    {type:"const", value:"root"},
    {type:"branch", list:[
      // this branch adds uid nodes to the tree when processing attribute data
      [
        {type:"value", key:"UID",
          filter:{op:"field", from:"ATTR", filter:{op:"empty", not:true}}
        },
        // mark a record in the query result as attribute data
        {type:"const", value:"A"},
        // a node is created for each element in the ATTRIBUTE_IDS array
        {type:"value", key:"ATTRIBUTE_IDS"},
      ],
      // this branch adds mapping data to existing uid nodes (create=false)
      // when processing mapping data
      [
        {type:"value", key:"UID", create:false, 
          filter:{op:"field", from:"MAPPING", filter:{op:"empty", not:true}}
        },
        // mark a record in the query result as mapping data
        {type:"const", value:"M"},
        {type:"value", key:"PUID"},
      ],
    ]},
  ],
}

 

  • We build two logical branches, one for each data source, by applying a filter based on the injected source field. Physically, all attribute IDs and PUIDs for the same UID are added under the same node.
  • Two marker nodes are used to group Attribute IDs and PUIDs.
  • Set PathKeyValue’s create property to false to update nodes only.

The Query

I won’t go into great details of hydra query here (though I recommend you read the Getting Started with Hydra blog post, which has many examples on how to query a hydra job).

Given the job config above, this is what the tree will look like:


root
  +- 12345
  |    +- A
  |    |  +- 1
  |    |  +- 3
  |    |  +- 4
  |    |  +- 5
  |    +- M
  |       +- abc
  +- 67890
  |    +- A
  |    |  +- 1
  |    |  +- 2
  |    +- M
  |       +- xyz
  |       +- zzz
  +- 11111
  |    +- A
  |       +- 1
  |       +- 2
  +- 22222
  |    +- A
  |       +- 3
...

 

The query path should be:


root/+:nodes>1/+/+

 

The +’s retrieve data from all levels. The :nodes>1 on the UID level tells hydra that we only want UIDs that have at least two nodes under it (i.e. both A and M marker nodes — having both attribute and mapping data). This query path will return data like this:


12345		A	1
12345		A	3
12345		A	4
12345		A	5
12345		M	abc
67890		A	1
67890		A	2
67890		M	xyz
67890		M	zzz

 

While the post-processing required to aggregate data per UID is pretty easy at this point, we can apply rops (which stands for remote query operations that are executed on individual task/slave node) to make things better:


str=c1,v=,cat,c2,cat,v2,set;merge=kij

 

OK, that needs some dissecting. Two operations are applied here: str and merge. The str op concatenates the second and the third column with a “=”, and saves the value back to the third column. The data will look like this after the str op is applied:


12345		A	A=1
12345		A	A=3
12345		A	A=4
12345		A	A=5
12345		M	M=abc
67890		A	A=1
67890		A	A=2
67890		M	M=xyz
67890		M	M=zzz

 

The merge op says: use the first column (UID) as the key, ignore the second column, and join values of the third column (using comma). The result after the merge ops is:


12345		A=1,A=3,A=4,A=5,M=abc
67890		A=1,A=2,M=xyz,M=zzz

 

And that’s it! This is a fast query because: 1) there is no explicit sorting by UID in rops — the records are naturally grouped by the path components, and 2) there is no ops (i.e. query operations executed on master) to aggregate data across all tasks — data is sharded by UID, so all data for the same UID is on the same slave and aggregated by UID already. Query ops on master can be expensive because it has to process data from all tasks. Without any ops, this query is very memory efficient as the lines will be streamed back as soon as they become available. Now the client application just needs to transform one line at a time to produce the desired output format of PUID/Attribute IDs. Of course the UIDs in the final result is not sorted. If for whatever reason it needs to be, you can use sort=0 for ops — since the slaves have already aggregated the data by UID, the number of records to be sorted on master is minimized.

This approach is basically a “left outer join”, the first data set being the “left” table. All keys from the first data set will be inserted into the tree. As the tree grows, subsequent insertion and update takes longer, so it works well if the first data set is relatively small. For example, in our production environment, we run a similar but slightly more complicated join as part of a data pipeline. The smaller data set has anywhere between 50M to 80M records, the larger one has 25B. Also, if the two data sets have a small overlap, much processing time is wasted. A possible optimization in that case is to use some kind of pre-filtering mechanism, such as bloom filters, to avoid unnecessary tree insertion and lookup.

Comparing to regular Hadoop join, some significant benefits of this approach are: 1) it can be done incrementally, and 2) the result is directly queryable using the powerful Hydra query language.