Monday, December 20, 2010

Pig SPLITs, JOINs, and COGROUPs to manipulate multiple relations

I've been playing around with Pig and UDFs for the last couple of weeks as we try to convert an application from using SQL to do ETL to using Pig for the same transforms.

In this particular application, we need to 'thread' logged messages together by fields that they can be joined on. Different messages represent different state around a single meta-state, kind of like a session, that unifies the different mesages.   Messages can have a specific type, lets call those A,B,C,and D. The joining rules are:

  • A joins B on field y
  • B joins C on field y
  • D joins A on field x,y,z

Split

The first step prior to joining messages is to separate them into relations that only contain A,B,C, or D messages using the Pig SPLIT statement. SPLIT works like this:

SPLIT tuple INTO something IF condition, something else IF other condition.....); 

basically SPLIT is a case statement, and I needed to write UDFs to implement the condition tests by comparing the input GMT against the specified day.

Writing UDFs for the SPLIT

In previous posts I've written eval UDFs. Those take input and transform it to something else. In this case I needed to implement filter UDFs. Filter UDFs return a boolean value based on their input.

I've found that the 'top down' approach works well when designing UDFs. By that I mean write the UDFs as they would be used in script:

SPLIT RAW_DATA INTO A IF isA(), B IF isB(), C IF isC(), D IF isD();

and then implement them. Because of the boolean nature of the UDFs I need to implement four different methods because I need to perform four tests in the SPLIT statement above. I'm basically going to implement the pattern:

public class IsA extends FilterFunc {

 @Override
 public Boolean exec(Tuple someTuple) throws IOException {
           return testForA(someTuple); 

 }
        
        protected Boolean testForA(Tuple someTuple) {
              ..... // determine if this is a type A, or not.
        }


}

So the SPLIT statement above works as advertised, partitioning the original raw data out by message type.

JOINing Relations

The next part of threading the messages together is to JOIN them along common fields. The JOIN statement groups relations by a single field:

JOINED_AB = JOIN A BY y, B BY y;

NOTE that this JOIN is an inner join, outer joins are a whole other beast. It simply aggregates all fields of B and C together.So the JOINED_AB relation looks like:

a::x,a::y,a::p,b::q,b::y,b::z

If you want to have an authoritative value of y for each tuple of JOINED_AB, you would need to explicitly generate it:

JOINED_AB = FOREACH JOINED_AB GENERATE a::y as y, .....;

In the case above, recall that

  • A joins B on field y
  • B joins C on field y
  • D joins A on field x,y,z

to knit these fields together, you would

JOINED_AB = JOIN A ON y, B on y;

JOINED_AB = FOREACH JOINED_AB GENERATE B::y as y,*;

JOINED_AB_C = JOIN JOINED_AB ON y, C on y;

At this point we want to join D to JOINED, but that needs to be done along a multiple column match. JOIN only handles single column matches. It's time to use COGROUP.

COGROUPing Relations

The first thing we need to do (for clarity) is to regenerate some of the fields in the JOINED relation:

JOINED = FOREACH JOINED generate A::x as x, A::y as y, A::z as z; 
 
This allows us to COGROUP without having to dereference by sub-tuple:

ALL_DATA = COGROUP JOINED ON (x,y,z) D on (x,y,z);

This relation is actually comprised of all fields of A,B,C,and D, but because we joined A,B,and C into JOINED before joining it to D, the tuple structure looks like this:

ALL_DATA: (x,y.z), {JOINED_AB_C: { JOINED_AB::x,JOINED_AB::y,JOINED_AB::z,JOINED_AB::A::field1,
JOINED_AB::B::field2}, D: {x,y,z,..}}

In other words like a GROUP, that takes members of the same relation and binds tuples by similar fields ,creating a group and a bag that holds a list of matching tuples, COGROUP takes members of different relations, binds them by similar fields, and creates a bag that contains a single instance of both relations where those relations have common fields. In fact the COGROUP and GROUP operations are the same, it's just common practice to use COGROUP when grouping multiple relations, GROUP when grouping the same relation.

Wednesday, December 1, 2010

Writing a custom PIG Loader

Foreward 

I've been pretty happy using the default pig loader, which takes as input the delimiter of a CSV, and loads tuples into memory as specified:

A = LOAD '/csv/input/inputs*' USING PigStorage() AS (field1,field2,..fieldN)

However I'm in the middle of doing some transforms on a csv with ~ 226 fields. Yikes. For most of these transforms, we don't need all 226 fields, in fact we probably only need a reasonable subset, but which reasonable subset depends on what we are trying to do. Ideally I'd like to be able to extract the values I want into a tuple like this:

A = LOAD 'somefile' using CustomLoader(1,4,45,100...) as (timestamp:long, id:long, url:chararray...);

So, it's time to write a custom loader.

Setup

Hadoop
I first installed the  CDH3 distro of hadoop -- specifically the psuedo mode configuration, which runs all hadoop core services, i.e. namenode, datanode, jobtracker and tasktracker on a single box. I then installed hadoop-pig. Cloudera makes this easy by leveraging apt-get and installing to the standard *nix hierarchical locations. The CDH3 version of Hadoop uses  /etc/alternatives  to allow for easy version switching, and logs reside in the usual /var/logs location.

sudo apt-get install hadoop-0.20-conf-pseudo

after starting core services as described in the link, I then installed CDH3 pig (version 0.7.0) via apt-get:

sudo apt-get install hadoop-pig

CDH3 Pig installs the pig shell script in /usr/bin, and provides libs in /usr/lib/pig. In order to run the pig shell, you need to set JAVA_HOME to /usr/lib/jvm/java-6-sun.

Mavenization

With the necessary services installed, I set up a maven project, mainly for brainless dependency management:

mvn archetype:create  -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.arunxarun.data.prototypes  -DartifactId=pigloader

I then wanted to bring in the pig jars as dependencies. I found hadoop-0.20-core in the mvnrepository, but could not find pig.jar or pig-core.jar in any maven repository. So I installed the pig and pig-core jars to my local repository from the /usr/lib/pig directory where they had been put by the apt-get install. I did that after creating versionless symlinks to the real jars whose names contained version information:

mvn install:install-file -Dfile=/usr/lib/pig/pig-core.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig-core -Dversion=0.7.0 -Dpackaging=jar

mvn install:install-file -Dfile=/usr/lib/pig/pig.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig -Dversion=0.7.0 -Dpackaging=jar


Finally, I made sure that the dependencies were referenced in my pom file:
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig-core</artifactId>
      <version>0.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig</artifactId>
      <version>0.7.0</version>
    </dependency>

Implementation

As of 0.7.0, Pig loaders extend the LoadFunc abstract class.This means they need to override 4 methods:
  • getInputFormat() this method returns to the caller an instance of the InputFormat that the loader supports. The actual load process needs an instance to use at load time, and doesn't want to place any constraints on how that instance is created.
  • prepareToRead() is called prior to reading a split. It passes in the reader used during the reads of the split, as well as the actual split. The implementation of the loader usually keeps the reader, and may want to access the actual split if needed.
  • setLocation() Pig calls this to communicate the load location to the loader, which is responsible for passing that information to the underlying InputFormat object. This method can be called multiple times, so there should be no state associated with the method (unless that state gets reset when the method is called).
  • getNext() Pig calls this to get the next tuple from the loader once all setup has been done. If this method returns a NULL, Pig assumes that all  information in the split passed via the prepareToRead() method has been processed. 
Here is the current implementation: note that the constructor takes a var arg set of Strings, which is the only kind of argument that can be used with a Pig Loader. Also note from above that RecordReader is set in prepareToRead, but actually used in getNext().

public class CustomLoader extends LoadFunc {

 private static final String DELIM = "\t";
 private static final int DEFAULT_LIMIT = 226;
 private int limit = DEFAULT_LIMIT;
 private RecordReader reader;
 private List indexes;
 private TupleFactory tupleFactory;

 /**
  * Pig Loaders only take string parameters. The CTOR is really the only interaction
  * the user has with the Loader from the script.  
  * @param indexesAsStrings
  */
 public CustomLoader(String...indexesAsStrings) {
  this.indexes = new ArrayList();
  for(String indexAsString : indexesAsStrings) {
   indexes.add(new Integer(indexAsString));
  }
  
  tupleFactory = TupleFactory.getInstance();
 }
 
 
 @Override
 public InputFormat getInputFormat() throws IOException {
   return new TextInputFormat();

 }

 /**
  * the input in this case is a TSV, so split it, make sure that the requested indexes are valid, 
  */
 @Override
 public Tuple getNext() throws IOException {
  Tuple tuple = null;
  List values = new ArrayList();
  
  try {
   boolean notDone = reader.nextKeyValue();
   if (!notDone) {
       return null;
   }
   Text value = (Text) reader.getCurrentValue();
   
   if(value != null) {
    String parts[] = value.toString().split(DELIM);
    
    for(Integer index : indexes) {
     
     if(index > limit) {
      throw new IOException("index "+index+ "is out of bounds: max index = "+limit);
     } else {
      values.add(parts[index]);
     }
    }
    
    tuple = tupleFactory.newTuple(values);
   }
   
  } catch (InterruptedException e) {
   // add more information to the runtime exception condition. 
   int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
  }

  return tuple;

 }

 @Override
 public void prepareToRead(RecordReader reader, PigSplit pigSplit)
   throws IOException {
  this.reader = reader; // note that for this Loader, we don't care about the PigSplit.
 }

 @Override
 public void setLocation(String location, Job job) throws IOException {
  FileInputFormat.setInputPaths(job, location); // the location is assumed to be comma separated paths. 

 }

} 
 
Testing
Testing a Pig UDF requires two steps: basic unit testing and integration testing via a script. I'm including this section because it also shows how the loader is accessed via Pig Latin.

Unit Testing: Mocking the Reader
I've implemented a MockRecordReader that I can pass into my CustomLoader via prepareToRead(). The MockRecordReader will be accessed when getNext() is called. Note that I've only implemented the methods I need. This is by no means a fully functional RecordReader:


public class MockRecordReader extends RecordReader {

 private BufferedReader reader;
 private long key;
 private boolean linesLeft;

 
 /**
  * call this to load the file
  * @param fileLocation
  * @throws FileNotFoundException 
  */
 
 
 public MockRecordReader(String fileLocation) throws FileNotFoundException {
  reader  = new BufferedReader(new FileReader(fileLocation));
  key = 0;
  linesLeft = true;
  }

 @Override
  public void close() throws IOException {
   // TODO Auto-generated method stub
   
  }

 @Override
  public Long getCurrentKey() throws IOException, InterruptedException {
   return key;
  }

 @Override
  public Text getCurrentValue() throws IOException, InterruptedException {
  String line = reader.readLine();
  
  if(line != null) {
   key++;
  } else {
   linesLeft = false;
  }
   return new Text(line);
  }

 @Override
  public float getProgress() throws IOException, InterruptedException {
   // dont need this for unit testing
   return 0;
  }

 @Override
  public void initialize(InputSplit arg0, TaskAttemptContext arg1)
      throws IOException, InterruptedException {
   // not initializing anything during unit testing.
   
  }

 @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
   return linesLeft;
  }

}


Implementing Units using the MockRecordReader is super easy. Note that I load the MockRecordReader up with some fake data for testing purposes.

public class CustomLoaderTest {

 @Test
 public void testValidInput() throws Exception{
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("0","2","4");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
  
  assertNotNull(tuple);
  
  String ts = (String)tuple.get(0);
  assertNotNull(ts);
  assertEquals(ts,"1130770920");
  
  String language = (String)tuple.get(1);
  assertEquals(language,"en-ca");
  String someCt = (String)tuple.get(2);
  assertEquals(someCt,"675");
  
  
 }
 
 @Test(expected=IOException.class)
 public void testInvalidInput() throws Exception {
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("300");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
 }
  
}

Integration Testing
Now that I know I haven't generated any NPEs from basic usage (NOTE: there are plenty more tests that I could do around bad format), it's integration test time. Integration testing a loader via Pig Latin is pretty simple: load data, then dump it, and validate that it looks like it should. Right now this is manual, basically running the script below, but output could/should be automatically validated.

Note that in order to use the UDF I've written, I need to specifically register it as shown in the first line below.

register '../../../target/CustomLoader-1.0-SNAPSHOT.jar'

-- the loader is fully path specified, and args are passed in using single quotes.
-- the file being loaded exists at the specified location in HDFS

A = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('0','2','6','19') AS (zero:long,two:chararray,six:long,nineteen:chararray); 
 
C =  GROUP A BY zero;
 
-- this forces pig to execute the query plan up to the DUMP, which means invoking the loader. 

DUMP C;

-- note that the same loader can be invoked with a different number of arguments, and
-- fields don't have to be cast
-- the file being loaded exists at the specified location in HDFS

B = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('100','200');

DUMP B;


Conclusion
Writing the code took about 10 minutes. Testing it took much longer. That seems to be the pattern for me when writing (simple) UDFs. What I've noticed about Pig scripts and UDFS is that in order to validate functionality throughout the script/UDF lifecycle you always need to validate the generated Tuples to feel confident that changes have been made without regression.

Other than the lack of automation around integration testing, the actual Loader works as advertised -- it might need to change to accommodate new requirements,  but it will work just fine for prototypical work with multi column CSV files.