Wednesday, December 1, 2010

Writing a custom PIG Loader

Foreward 

I've been pretty happy using the default pig loader, which takes as input the delimiter of a CSV, and loads tuples into memory as specified:

A = LOAD '/csv/input/inputs*' USING PigStorage() AS (field1,field2,..fieldN)

However I'm in the middle of doing some transforms on a csv with ~ 226 fields. Yikes. For most of these transforms, we don't need all 226 fields, in fact we probably only need a reasonable subset, but which reasonable subset depends on what we are trying to do. Ideally I'd like to be able to extract the values I want into a tuple like this:

A = LOAD 'somefile' using CustomLoader(1,4,45,100...) as (timestamp:long, id:long, url:chararray...);

So, it's time to write a custom loader.

Setup

Hadoop
I first installed the  CDH3 distro of hadoop -- specifically the psuedo mode configuration, which runs all hadoop core services, i.e. namenode, datanode, jobtracker and tasktracker on a single box. I then installed hadoop-pig. Cloudera makes this easy by leveraging apt-get and installing to the standard *nix hierarchical locations. The CDH3 version of Hadoop uses  /etc/alternatives  to allow for easy version switching, and logs reside in the usual /var/logs location.

sudo apt-get install hadoop-0.20-conf-pseudo

after starting core services as described in the link, I then installed CDH3 pig (version 0.7.0) via apt-get:

sudo apt-get install hadoop-pig

CDH3 Pig installs the pig shell script in /usr/bin, and provides libs in /usr/lib/pig. In order to run the pig shell, you need to set JAVA_HOME to /usr/lib/jvm/java-6-sun.

Mavenization

With the necessary services installed, I set up a maven project, mainly for brainless dependency management:

mvn archetype:create  -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.arunxarun.data.prototypes  -DartifactId=pigloader

I then wanted to bring in the pig jars as dependencies. I found hadoop-0.20-core in the mvnrepository, but could not find pig.jar or pig-core.jar in any maven repository. So I installed the pig and pig-core jars to my local repository from the /usr/lib/pig directory where they had been put by the apt-get install. I did that after creating versionless symlinks to the real jars whose names contained version information:

mvn install:install-file -Dfile=/usr/lib/pig/pig-core.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig-core -Dversion=0.7.0 -Dpackaging=jar

mvn install:install-file -Dfile=/usr/lib/pig/pig.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig -Dversion=0.7.0 -Dpackaging=jar


Finally, I made sure that the dependencies were referenced in my pom file:
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig-core</artifactId>
      <version>0.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig</artifactId>
      <version>0.7.0</version>
    </dependency>

Implementation

As of 0.7.0, Pig loaders extend the LoadFunc abstract class.This means they need to override 4 methods:
  • getInputFormat() this method returns to the caller an instance of the InputFormat that the loader supports. The actual load process needs an instance to use at load time, and doesn't want to place any constraints on how that instance is created.
  • prepareToRead() is called prior to reading a split. It passes in the reader used during the reads of the split, as well as the actual split. The implementation of the loader usually keeps the reader, and may want to access the actual split if needed.
  • setLocation() Pig calls this to communicate the load location to the loader, which is responsible for passing that information to the underlying InputFormat object. This method can be called multiple times, so there should be no state associated with the method (unless that state gets reset when the method is called).
  • getNext() Pig calls this to get the next tuple from the loader once all setup has been done. If this method returns a NULL, Pig assumes that all  information in the split passed via the prepareToRead() method has been processed. 
Here is the current implementation: note that the constructor takes a var arg set of Strings, which is the only kind of argument that can be used with a Pig Loader. Also note from above that RecordReader is set in prepareToRead, but actually used in getNext().

public class CustomLoader extends LoadFunc {

 private static final String DELIM = "\t";
 private static final int DEFAULT_LIMIT = 226;
 private int limit = DEFAULT_LIMIT;
 private RecordReader reader;
 private List indexes;
 private TupleFactory tupleFactory;

 /**
  * Pig Loaders only take string parameters. The CTOR is really the only interaction
  * the user has with the Loader from the script.  
  * @param indexesAsStrings
  */
 public CustomLoader(String...indexesAsStrings) {
  this.indexes = new ArrayList();
  for(String indexAsString : indexesAsStrings) {
   indexes.add(new Integer(indexAsString));
  }
  
  tupleFactory = TupleFactory.getInstance();
 }
 
 
 @Override
 public InputFormat getInputFormat() throws IOException {
   return new TextInputFormat();

 }

 /**
  * the input in this case is a TSV, so split it, make sure that the requested indexes are valid, 
  */
 @Override
 public Tuple getNext() throws IOException {
  Tuple tuple = null;
  List values = new ArrayList();
  
  try {
   boolean notDone = reader.nextKeyValue();
   if (!notDone) {
       return null;
   }
   Text value = (Text) reader.getCurrentValue();
   
   if(value != null) {
    String parts[] = value.toString().split(DELIM);
    
    for(Integer index : indexes) {
     
     if(index > limit) {
      throw new IOException("index "+index+ "is out of bounds: max index = "+limit);
     } else {
      values.add(parts[index]);
     }
    }
    
    tuple = tupleFactory.newTuple(values);
   }
   
  } catch (InterruptedException e) {
   // add more information to the runtime exception condition. 
   int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
  }

  return tuple;

 }

 @Override
 public void prepareToRead(RecordReader reader, PigSplit pigSplit)
   throws IOException {
  this.reader = reader; // note that for this Loader, we don't care about the PigSplit.
 }

 @Override
 public void setLocation(String location, Job job) throws IOException {
  FileInputFormat.setInputPaths(job, location); // the location is assumed to be comma separated paths. 

 }

} 
 
Testing
Testing a Pig UDF requires two steps: basic unit testing and integration testing via a script. I'm including this section because it also shows how the loader is accessed via Pig Latin.

Unit Testing: Mocking the Reader
I've implemented a MockRecordReader that I can pass into my CustomLoader via prepareToRead(). The MockRecordReader will be accessed when getNext() is called. Note that I've only implemented the methods I need. This is by no means a fully functional RecordReader:


public class MockRecordReader extends RecordReader {

 private BufferedReader reader;
 private long key;
 private boolean linesLeft;

 
 /**
  * call this to load the file
  * @param fileLocation
  * @throws FileNotFoundException 
  */
 
 
 public MockRecordReader(String fileLocation) throws FileNotFoundException {
  reader  = new BufferedReader(new FileReader(fileLocation));
  key = 0;
  linesLeft = true;
  }

 @Override
  public void close() throws IOException {
   // TODO Auto-generated method stub
   
  }

 @Override
  public Long getCurrentKey() throws IOException, InterruptedException {
   return key;
  }

 @Override
  public Text getCurrentValue() throws IOException, InterruptedException {
  String line = reader.readLine();
  
  if(line != null) {
   key++;
  } else {
   linesLeft = false;
  }
   return new Text(line);
  }

 @Override
  public float getProgress() throws IOException, InterruptedException {
   // dont need this for unit testing
   return 0;
  }

 @Override
  public void initialize(InputSplit arg0, TaskAttemptContext arg1)
      throws IOException, InterruptedException {
   // not initializing anything during unit testing.
   
  }

 @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
   return linesLeft;
  }

}


Implementing Units using the MockRecordReader is super easy. Note that I load the MockRecordReader up with some fake data for testing purposes.

public class CustomLoaderTest {

 @Test
 public void testValidInput() throws Exception{
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("0","2","4");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
  
  assertNotNull(tuple);
  
  String ts = (String)tuple.get(0);
  assertNotNull(ts);
  assertEquals(ts,"1130770920");
  
  String language = (String)tuple.get(1);
  assertEquals(language,"en-ca");
  String someCt = (String)tuple.get(2);
  assertEquals(someCt,"675");
  
  
 }
 
 @Test(expected=IOException.class)
 public void testInvalidInput() throws Exception {
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("300");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
 }
  
}

Integration Testing
Now that I know I haven't generated any NPEs from basic usage (NOTE: there are plenty more tests that I could do around bad format), it's integration test time. Integration testing a loader via Pig Latin is pretty simple: load data, then dump it, and validate that it looks like it should. Right now this is manual, basically running the script below, but output could/should be automatically validated.

Note that in order to use the UDF I've written, I need to specifically register it as shown in the first line below.

register '../../../target/CustomLoader-1.0-SNAPSHOT.jar'

-- the loader is fully path specified, and args are passed in using single quotes.
-- the file being loaded exists at the specified location in HDFS

A = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('0','2','6','19') AS (zero:long,two:chararray,six:long,nineteen:chararray); 
 
C =  GROUP A BY zero;
 
-- this forces pig to execute the query plan up to the DUMP, which means invoking the loader. 

DUMP C;

-- note that the same loader can be invoked with a different number of arguments, and
-- fields don't have to be cast
-- the file being loaded exists at the specified location in HDFS

B = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('100','200');

DUMP B;


Conclusion
Writing the code took about 10 minutes. Testing it took much longer. That seems to be the pattern for me when writing (simple) UDFs. What I've noticed about Pig scripts and UDFS is that in order to validate functionality throughout the script/UDF lifecycle you always need to validate the generated Tuples to feel confident that changes have been made without regression.

Other than the lack of automation around integration testing, the actual Loader works as advertised -- it might need to change to accommodate new requirements,  but it will work just fine for prototypical work with multi column CSV files.

8 comments:

  1. Nice post!

    Thanks.

    ReplyDelete
  2. Indeed. This is awesome. We will have to start calling you Bootstrap Bill.

    ReplyDelete
  3. "we don't need all 226 fields ..." - sounds like a good use case for zebra storage if your input source is going to be consumed by multiple consumers.

    ReplyDelete
  4. Can you show an example of custom XMLLoader using pig0.6

    ReplyDelete
  5. Thanks for this great article...

    ReplyDelete
  6. awesome... can you post the code please..

    ReplyDelete
  7. Thanks a lot. Very useful code

    ReplyDelete
  8. java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

    Gets this error...
    I am getting this only when i run my custom loader.. Other times , its working fine.

    ReplyDelete