In this particular application, we need to 'thread' logged messages together by fields that they can be joined on. Different messages represent different state around a single meta-state, kind of like a session, that unifies the different mesages. Messages can have a specific type, lets call those A,B,C,and D. The joining rules are:
- A joins B on field y
- B joins C on field y
- D joins A on field x,y,z
Split
The first step prior to joining messages is to separate them into relations that only contain A,B,C, or D messages using the Pig SPLIT statement. SPLIT works like this:
SPLIT tuple INTO something IF condition, something else IF other condition.....);
basically SPLIT is a case statement, and I needed to write UDFs to implement the condition tests by comparing the input GMT against the specified day.
Writing UDFs for the SPLIT
In previous posts I've written eval UDFs. Those take input and transform it to something else. In this case I needed to implement filter UDFs. Filter UDFs return a boolean value based on their input.
I've found that the 'top down' approach works well when designing UDFs. By that I mean write the UDFs as they would be used in script:
SPLIT RAW_DATA INTO A IF isA(), B IF isB(), C IF isC(), D IF isD();
and then implement them. Because of the boolean nature of the UDFs I need to implement four different methods because I need to perform four tests in the SPLIT statement above. I'm basically going to implement the pattern:
public class IsA extends FilterFunc { @Override public Boolean exec(Tuple someTuple) throws IOException { return testForA(someTuple); } protected Boolean testForA(Tuple someTuple) { ..... // determine if this is a type A, or not. } }
So the SPLIT statement above works as advertised, partitioning the original raw data out by message type.
JOINing Relations
The next part of threading the messages together is to JOIN them along common fields. The JOIN statement groups relations by a single field:
JOINED_AB = JOIN A BY y, B BY y;
NOTE that this JOIN is an inner join, outer joins are a whole other beast. It simply aggregates all fields of B and C together.So the JOINED_AB relation looks like:
a::x,a::y,a::p,b::q,b::y,b::z
If you want to have an authoritative value of y for each tuple of JOINED_AB, you would need to explicitly generate it:
JOINED_AB = FOREACH JOINED_AB GENERATE a::y as y, .....;
In the case above, recall that
- A joins B on field y
- B joins C on field y
- D joins A on field x,y,z
to knit these fields together, you would
JOINED_AB = JOIN A ON y, B on y; JOINED_AB = FOREACH JOINED_AB GENERATE B::y as y,*; JOINED_AB_C = JOIN JOINED_AB ON y, C on y;
At this point we want to join D to JOINED, but that needs to be done along a multiple column match. JOIN only handles single column matches. It's time to use COGROUP.
COGROUPing Relations
The first thing we need to do (for clarity) is to regenerate some of the fields in the JOINED relation:
JOINED = FOREACH JOINED generate A::x as x, A::y as y, A::z as z;
This allows us to COGROUP without having to dereference by sub-tuple:
ALL_DATA = COGROUP JOINED ON (x,y,z) D on (x,y,z);
This relation is actually comprised of all fields of A,B,C,and D, but because we joined A,B,and C into JOINED before joining it to D, the tuple structure looks like this:
ALL_DATA: (x,y.z), {JOINED_AB_C: { JOINED_AB::x,JOINED_AB::y,JOINED_AB::z,JOINED_AB::A::field1,
JOINED_AB::B::field2}, D: {x,y,z,..}}
In other words like a GROUP, that takes members of the same relation and binds tuples by similar fields ,creating a group and a bag that holds a list of matching tuples, COGROUP takes members of different relations, binds them by similar fields, and creates a bag that contains a single instance of both relations where those relations have common fields. In fact the COGROUP and GROUP operations are the same, it's just common practice to use COGROUP when grouping multiple relations, GROUP when grouping the same relation.
bbm for android apk: I like this site and article as well..Thanks for posting such good content..I appreicate the work done and its cool
ReplyDelete