Extending Hadoop Pig for Hierarchical Data

I've been playing with Hadoop Pig lately, and having a fun time.  Pig is an easy to use language for writing Map Reduce jobs against Hadoop.

Our data is very hierarchical, and we calculate a lot of aggregates for self nodes, their children nodes, and self plus children.  We have a few tricks up our sleeves for SQL for handling these types of aggregates, but of course with Map Reduce an entirely new way of thinking is required.

Luckily, Pig allows for easily created User Defined Functions (UDFs) that extend the Pig language.  I was able to take an existing Pig UDF, TOKENIZE, and alter it to suite my needs.

Specifically, our data looks like this:

111,/A/B/C
222,/A/B
333,/A/B/C


We need to answer questions such as "How many records for A and all of its children?" In this case, the answer is three. We also need to answer "How many records for just A?" which is zero, or "for just C?" which is two.

Our strategy is to take the path (eg /A/B/C) and split it into all the different paths contained within. For example, /A/B/C can be split into:

  • /A
  • /A/B
  • /A/B/C
Now, we can take the original record (eg 111) and attribute it to all three paths that stem from the original /A/B/C. Once we have all paths, we can perform aggregate calculates the way Map Reduce wants to (which basically is sorted (grouped by) and then counted or summed). To see how this works, simply get the source code from Pig, look for the TOKENIZE class. Below are my modifications:

@Override
public DataBag exec(Tuple input) throws IOException {
try {
DataBag output = mBagFactory.newDefaultBag();
Object o = input.get(0);
if (!(o instanceof String)) {
int errCode = 2114;
String msg = "Expected input to be chararray, but" +
" got " + o.getClass().getName();
throw new ExecException(msg, errCode, PigException.BUG);
}
StringTokenizer tok = new StringTokenizer((String)o, (String)input.get(1), false);
StringBuilder sb = new StringBuilder();
while (tok.hasMoreTokens()) {
sb.append((String)input.get(1));
sb.append(tok.nextToken());
output.add(mTupleFactory.newTuple(sb.toString()));
}
return output;
} catch (ExecException ee) {
throw ee;
}
}


The end result now looks like this:

111,/A
111,/A/B
111,/A/B/C
222,/A
222,/A/B
333,/A
333,/A/B
333,/A/B/C


To learn more about writing your own Pig UDFs, consult the Pig UDF Manual.
Post a Comment

Popular posts from this blog