Monday, December 20, 2010

Pig SPLITs, JOINs, and COGROUPs to manipulate multiple relations

I've been playing around with Pig and UDFs for the last couple of weeks as we try to convert an application from using SQL to do ETL to using Pig for the same transforms.

In this particular application, we need to 'thread' logged messages together by fields that they can be joined on. Different messages represent different state around a single meta-state, kind of like a session, that unifies the different mesages.   Messages can have a specific type, lets call those A,B,C,and D. The joining rules are:

  • A joins B on field y
  • B joins C on field y
  • D joins A on field x,y,z

Split

The first step prior to joining messages is to separate them into relations that only contain A,B,C, or D messages using the Pig SPLIT statement. SPLIT works like this:

SPLIT tuple INTO something IF condition, something else IF other condition.....); 

basically SPLIT is a case statement, and I needed to write UDFs to implement the condition tests by comparing the input GMT against the specified day.

Writing UDFs for the SPLIT

In previous posts I've written eval UDFs. Those take input and transform it to something else. In this case I needed to implement filter UDFs. Filter UDFs return a boolean value based on their input.

I've found that the 'top down' approach works well when designing UDFs. By that I mean write the UDFs as they would be used in script:

SPLIT RAW_DATA INTO A IF isA(), B IF isB(), C IF isC(), D IF isD();

and then implement them. Because of the boolean nature of the UDFs I need to implement four different methods because I need to perform four tests in the SPLIT statement above. I'm basically going to implement the pattern:

public class IsA extends FilterFunc {

 @Override
 public Boolean exec(Tuple someTuple) throws IOException {
           return testForA(someTuple); 

 }
        
        protected Boolean testForA(Tuple someTuple) {
              ..... // determine if this is a type A, or not.
        }


}

So the SPLIT statement above works as advertised, partitioning the original raw data out by message type.

JOINing Relations

The next part of threading the messages together is to JOIN them along common fields. The JOIN statement groups relations by a single field:

JOINED_AB = JOIN A BY y, B BY y;

NOTE that this JOIN is an inner join, outer joins are a whole other beast. It simply aggregates all fields of B and C together.So the JOINED_AB relation looks like:

a::x,a::y,a::p,b::q,b::y,b::z

If you want to have an authoritative value of y for each tuple of JOINED_AB, you would need to explicitly generate it:

JOINED_AB = FOREACH JOINED_AB GENERATE a::y as y, .....;

In the case above, recall that

  • A joins B on field y
  • B joins C on field y
  • D joins A on field x,y,z

to knit these fields together, you would

JOINED_AB = JOIN A ON y, B on y;

JOINED_AB = FOREACH JOINED_AB GENERATE B::y as y,*;

JOINED_AB_C = JOIN JOINED_AB ON y, C on y;

At this point we want to join D to JOINED, but that needs to be done along a multiple column match. JOIN only handles single column matches. It's time to use COGROUP.

COGROUPing Relations

The first thing we need to do (for clarity) is to regenerate some of the fields in the JOINED relation:

JOINED = FOREACH JOINED generate A::x as x, A::y as y, A::z as z; 
 
This allows us to COGROUP without having to dereference by sub-tuple:

ALL_DATA = COGROUP JOINED ON (x,y,z) D on (x,y,z);

This relation is actually comprised of all fields of A,B,C,and D, but because we joined A,B,and C into JOINED before joining it to D, the tuple structure looks like this:

ALL_DATA: (x,y.z), {JOINED_AB_C: { JOINED_AB::x,JOINED_AB::y,JOINED_AB::z,JOINED_AB::A::field1,
JOINED_AB::B::field2}, D: {x,y,z,..}}

In other words like a GROUP, that takes members of the same relation and binds tuples by similar fields ,creating a group and a bag that holds a list of matching tuples, COGROUP takes members of different relations, binds them by similar fields, and creates a bag that contains a single instance of both relations where those relations have common fields. In fact the COGROUP and GROUP operations are the same, it's just common practice to use COGROUP when grouping multiple relations, GROUP when grouping the same relation.

Wednesday, December 1, 2010

Writing a custom PIG Loader

Foreward 

I've been pretty happy using the default pig loader, which takes as input the delimiter of a CSV, and loads tuples into memory as specified:

A = LOAD '/csv/input/inputs*' USING PigStorage() AS (field1,field2,..fieldN)

However I'm in the middle of doing some transforms on a csv with ~ 226 fields. Yikes. For most of these transforms, we don't need all 226 fields, in fact we probably only need a reasonable subset, but which reasonable subset depends on what we are trying to do. Ideally I'd like to be able to extract the values I want into a tuple like this:

A = LOAD 'somefile' using CustomLoader(1,4,45,100...) as (timestamp:long, id:long, url:chararray...);

So, it's time to write a custom loader.

Setup

Hadoop
I first installed the  CDH3 distro of hadoop -- specifically the psuedo mode configuration, which runs all hadoop core services, i.e. namenode, datanode, jobtracker and tasktracker on a single box. I then installed hadoop-pig. Cloudera makes this easy by leveraging apt-get and installing to the standard *nix hierarchical locations. The CDH3 version of Hadoop uses  /etc/alternatives  to allow for easy version switching, and logs reside in the usual /var/logs location.

sudo apt-get install hadoop-0.20-conf-pseudo

after starting core services as described in the link, I then installed CDH3 pig (version 0.7.0) via apt-get:

sudo apt-get install hadoop-pig

CDH3 Pig installs the pig shell script in /usr/bin, and provides libs in /usr/lib/pig. In order to run the pig shell, you need to set JAVA_HOME to /usr/lib/jvm/java-6-sun.

Mavenization

With the necessary services installed, I set up a maven project, mainly for brainless dependency management:

mvn archetype:create  -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.arunxarun.data.prototypes  -DartifactId=pigloader

I then wanted to bring in the pig jars as dependencies. I found hadoop-0.20-core in the mvnrepository, but could not find pig.jar or pig-core.jar in any maven repository. So I installed the pig and pig-core jars to my local repository from the /usr/lib/pig directory where they had been put by the apt-get install. I did that after creating versionless symlinks to the real jars whose names contained version information:

mvn install:install-file -Dfile=/usr/lib/pig/pig-core.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig-core -Dversion=0.7.0 -Dpackaging=jar

mvn install:install-file -Dfile=/usr/lib/pig/pig.jar -DgroupId=org.apache.hadoop -DartifactId=hadoop-pig -Dversion=0.7.0 -Dpackaging=jar


Finally, I made sure that the dependencies were referenced in my pom file:
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig-core</artifactId>
      <version>0.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-pig</artifactId>
      <version>0.7.0</version>
    </dependency>

Implementation

As of 0.7.0, Pig loaders extend the LoadFunc abstract class.This means they need to override 4 methods:
  • getInputFormat() this method returns to the caller an instance of the InputFormat that the loader supports. The actual load process needs an instance to use at load time, and doesn't want to place any constraints on how that instance is created.
  • prepareToRead() is called prior to reading a split. It passes in the reader used during the reads of the split, as well as the actual split. The implementation of the loader usually keeps the reader, and may want to access the actual split if needed.
  • setLocation() Pig calls this to communicate the load location to the loader, which is responsible for passing that information to the underlying InputFormat object. This method can be called multiple times, so there should be no state associated with the method (unless that state gets reset when the method is called).
  • getNext() Pig calls this to get the next tuple from the loader once all setup has been done. If this method returns a NULL, Pig assumes that all  information in the split passed via the prepareToRead() method has been processed. 
Here is the current implementation: note that the constructor takes a var arg set of Strings, which is the only kind of argument that can be used with a Pig Loader. Also note from above that RecordReader is set in prepareToRead, but actually used in getNext().

public class CustomLoader extends LoadFunc {

 private static final String DELIM = "\t";
 private static final int DEFAULT_LIMIT = 226;
 private int limit = DEFAULT_LIMIT;
 private RecordReader reader;
 private List indexes;
 private TupleFactory tupleFactory;

 /**
  * Pig Loaders only take string parameters. The CTOR is really the only interaction
  * the user has with the Loader from the script.  
  * @param indexesAsStrings
  */
 public CustomLoader(String...indexesAsStrings) {
  this.indexes = new ArrayList();
  for(String indexAsString : indexesAsStrings) {
   indexes.add(new Integer(indexAsString));
  }
  
  tupleFactory = TupleFactory.getInstance();
 }
 
 
 @Override
 public InputFormat getInputFormat() throws IOException {
   return new TextInputFormat();

 }

 /**
  * the input in this case is a TSV, so split it, make sure that the requested indexes are valid, 
  */
 @Override
 public Tuple getNext() throws IOException {
  Tuple tuple = null;
  List values = new ArrayList();
  
  try {
   boolean notDone = reader.nextKeyValue();
   if (!notDone) {
       return null;
   }
   Text value = (Text) reader.getCurrentValue();
   
   if(value != null) {
    String parts[] = value.toString().split(DELIM);
    
    for(Integer index : indexes) {
     
     if(index > limit) {
      throw new IOException("index "+index+ "is out of bounds: max index = "+limit);
     } else {
      values.add(parts[index]);
     }
    }
    
    tuple = tupleFactory.newTuple(values);
   }
   
  } catch (InterruptedException e) {
   // add more information to the runtime exception condition. 
   int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
  }

  return tuple;

 }

 @Override
 public void prepareToRead(RecordReader reader, PigSplit pigSplit)
   throws IOException {
  this.reader = reader; // note that for this Loader, we don't care about the PigSplit.
 }

 @Override
 public void setLocation(String location, Job job) throws IOException {
  FileInputFormat.setInputPaths(job, location); // the location is assumed to be comma separated paths. 

 }

} 
 
Testing
Testing a Pig UDF requires two steps: basic unit testing and integration testing via a script. I'm including this section because it also shows how the loader is accessed via Pig Latin.

Unit Testing: Mocking the Reader
I've implemented a MockRecordReader that I can pass into my CustomLoader via prepareToRead(). The MockRecordReader will be accessed when getNext() is called. Note that I've only implemented the methods I need. This is by no means a fully functional RecordReader:


public class MockRecordReader extends RecordReader {

 private BufferedReader reader;
 private long key;
 private boolean linesLeft;

 
 /**
  * call this to load the file
  * @param fileLocation
  * @throws FileNotFoundException 
  */
 
 
 public MockRecordReader(String fileLocation) throws FileNotFoundException {
  reader  = new BufferedReader(new FileReader(fileLocation));
  key = 0;
  linesLeft = true;
  }

 @Override
  public void close() throws IOException {
   // TODO Auto-generated method stub
   
  }

 @Override
  public Long getCurrentKey() throws IOException, InterruptedException {
   return key;
  }

 @Override
  public Text getCurrentValue() throws IOException, InterruptedException {
  String line = reader.readLine();
  
  if(line != null) {
   key++;
  } else {
   linesLeft = false;
  }
   return new Text(line);
  }

 @Override
  public float getProgress() throws IOException, InterruptedException {
   // dont need this for unit testing
   return 0;
  }

 @Override
  public void initialize(InputSplit arg0, TaskAttemptContext arg1)
      throws IOException, InterruptedException {
   // not initializing anything during unit testing.
   
  }

 @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
   return linesLeft;
  }

}


Implementing Units using the MockRecordReader is super easy. Note that I load the MockRecordReader up with some fake data for testing purposes.

public class CustomLoaderTest {

 @Test
 public void testValidInput() throws Exception{
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("0","2","4");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
  
  assertNotNull(tuple);
  
  String ts = (String)tuple.get(0);
  assertNotNull(ts);
  assertEquals(ts,"1130770920");
  
  String language = (String)tuple.get(1);
  assertEquals(language,"en-ca");
  String someCt = (String)tuple.get(2);
  assertEquals(someCt,"675");
  
  
 }
 
 @Test(expected=IOException.class)
 public void testInvalidInput() throws Exception {
  MockRecordReader reader = new MockRecordReader("src/test/resources/valid1line_hit_data.tsv");
  
  CustomLoader custLoader = new CustomLoader("300");
  
  custLoader.prepareToRead(reader, null);
  
  Tuple tuple = custLoader.getNext();
 }
  
}

Integration Testing
Now that I know I haven't generated any NPEs from basic usage (NOTE: there are plenty more tests that I could do around bad format), it's integration test time. Integration testing a loader via Pig Latin is pretty simple: load data, then dump it, and validate that it looks like it should. Right now this is manual, basically running the script below, but output could/should be automatically validated.

Note that in order to use the UDF I've written, I need to specifically register it as shown in the first line below.

register '../../../target/CustomLoader-1.0-SNAPSHOT.jar'

-- the loader is fully path specified, and args are passed in using single quotes.
-- the file being loaded exists at the specified location in HDFS

A = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('0','2','6','19') AS (zero:long,two:chararray,six:long,nineteen:chararray); 
 
C =  GROUP A BY zero;
 
-- this forces pig to execute the query plan up to the DUMP, which means invoking the loader. 

DUMP C;

-- note that the same loader can be invoked with a different number of arguments, and
-- fields don't have to be cast
-- the file being loaded exists at the specified location in HDFS

B = LOAD '/test/hit_data.tsv' USING com.foo.bar.CustomLoader('100','200');

DUMP B;


Conclusion
Writing the code took about 10 minutes. Testing it took much longer. That seems to be the pattern for me when writing (simple) UDFs. What I've noticed about Pig scripts and UDFS is that in order to validate functionality throughout the script/UDF lifecycle you always need to validate the generated Tuples to feel confident that changes have been made without regression.

Other than the lack of automation around integration testing, the actual Loader works as advertised -- it might need to change to accommodate new requirements,  but it will work just fine for prototypical work with multi column CSV files.

Wednesday, November 17, 2010

Pull Parsing with STAX

Foreward
Due to some events beyond my control, I lost the source code for parsing my Garmin TCX data into CSV format. I had not gotten around to Git-ifying my source, and also had not backed it up via JungleDisk/Mozy, so the original Ruby code that used the LibXML SAX parser is gone.

That might not necessarily be a bad thing. In SAX parsing, events happen without any surrounding context. It is up to the programmer to supply the context, and doing that in a legible, maintainable way with the SAX event driven model is a challenge. When I had discovered issues with the parsing code, fixing those issues required a lot of time to determine the actual state at the time the bug occurred. SAX parsing code has a significant maintenance penalty.

DOM parsing is much more straightforward because you navigate the structure of the XML, and implicitly get the associated context. Unfortunately, it is prohibitively expensive because it requires that the entire document get loaded into memory.

STAX parsing is a reasonable compromise between the two extremes. It streams the file (i.e. only loading into memory what it needs), while allowing the developer to navigate the structure of the XML. In other words, you have context without memory overhead.

STAX can be used to read or write XML files -- in my application, that converted .TCX files to a CSV format, I was focused on reading XML, not writing it. For reading files, STAX provides two different APIs. The cursor based API uses XMLStreamReader. The iterator based API uses XMLEventReader. The key difference between the two is that the iterator based API treats events as first class objects and allows the user to peek ahead at the next element to be fetched. This supplies the user with more context than the iterator based API. That context comes with additional resource consumption, but still much less than loading an entire DOM into memory.

Onward
In my first STAX program, I wanted to see how far I could get with the cursor based API. Specifically, given it's better performance characteristics, would it provide enough context for me to write easily maintainable code?

A Moment...
A quick side note on why (again) I'm choosing to convert an XML stream to CSV. XML is great because DTDs and Schemas provide a way to validate document integrity when there are large numbers of optional elements. In the case of TCX, there are few optional elements -- the elements that exist always contain the same kinds of data. With an unchanging format, CSV makes more sense because it is a more compact representation of a stable data set.

Implementation Details
In my re-implementation of TCX->CSV parsing code, I needed to transform a set of nested parameters into two different CSV formats. In order to explain what I needed to do, I need to go into detail about what the Garmin TCX XML looks like, and what I wanted to extract from it.

<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<TrainingCenterDatabase xmlns="http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2 http://www.garmin.com/xmlschemas/TrainingCenterDatabasev2.xsd">

  <Activities>
    <Activity Sport="Biking">
      <Id>2010-11-04T22:02:59Z</Id>
      <Lap StartTime="2010-11-04T22:02:59Z">
        <TotalTimeSeconds>1798.6000000</TotalTimeSeconds>
        <DistanceMeters>15412.0742188</DistanceMeters>
        <MaximumSpeed>12.8037510</MaximumSpeed>
        <Calories>581</Calories>
        <AverageHeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
          <Value>140</Value>
        </AverageHeartRateBpm>
        <MaximumHeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
          <Value>158</Value>
        </MaximumHeartRateBpm>
        <Intensity>Active</Intensity>
        <Cadence>0</Cadence>
        <TriggerMethod>Manual</TriggerMethod>
        <Track>
          <Trackpoint>
            <Time>2010-11-04T22:03:00Z</Time>
            <Position>
              <LatitudeDegrees>47.5834731</LatitudeDegrees>
              <LongitudeDegrees>-122.2491668</LongitudeDegrees>
            </Position>
            <AltitudeMeters>17.4411621</AltitudeMeters>
            <DistanceMeters>26.8590393</DistanceMeters>
            <HeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
              <Value>85</Value>
            </HeartRateBpm>
            <SensorState>Absent</SensorState>
          </Trackpoint>
          ...
        </Track>
     </Lap>
     ...
   </Activity>
     ...
 <TrainingCenterDatabase>


I want to extract two kinds of data out of the XML stream above:
  1. Lap summary data. Lap summary data is good for high level comparisons of effort. The basic components of Lap summary data of the same general duration can be compared across laps.
  2. Trackpoint data. Trackpoint data -- elevation, heart rate, lat/long can be analyzed/transformed across arbitrary intervals to measure input effort and output speed.
Lap Summary Data will look like this:
activity_id,lap_id,total_time,total_distance,max_speed,total_calories,average_heartrate,max_heartrate
Trackpoint detail data will look like this:
lap_id,trackpoint_id,time, latitude,longitude,altitude,distance,heartrate

Both files may end up being used when correlating track points to their parent laps and activities.

Initializing the SAX Parser
I've created a class TCXPullParser to parse Garmin TCX data. In the constructor I initialize the STAX parser:

XMLStreamReader parser;
         .....
         /**
  * ctor, with all file names to write to and read from.
  * 
  * @param lapSummaryWriter
  * @param trackDetailWriter
  * @param fileToParse
  * @throws IOException
  * @throws XMLStreamException
  */
 public TCXPullParser(CSVWriter lapSummaryWriter, CSVWriter trackDetailWriter,
     String fileToParse) throws IOException, XMLStreamException {
  
  FileInputStream fis = new FileInputStream(fileToParse);
  XMLInputFactory factory = XMLInputFactory.newInstance();
  parser = factory.createXMLStreamReader(fis);
                ...
 }

Initialization is simple, the parser is created from the XMLInputFactory, and takes the FileInputStream created on the input file name. From this point my primary access to the file is through the parser object. I use the parser object to advance the cursor (by calling next()), inspect the type of element (as a return value from parser.next()), and grab text (parser.getText()). These three methods, with some additional functionality I've added, give me enough context to actually top-down parse the XML. 

Pull Parsing
One advantage of using pull parsing is that the code is in charge of when events are fired. This lets us do things like skip processing/move directly to an element that we are interested in by using the parser.next() method and checking the returned element type:

/**
 * skips parser to the start element of the specified element name, while
 * stopElementName has not been encountered.
 * 
 * @param parser
 * @param elementName
 * @param stopElementName
 *          if we get this far, we've gone too far.
 * @return true if element is found
 * @throws Exception
 */
protected boolean skipTo(XMLStreamReader parser, String elementName,
    String stopElementName) throws Exception {
 boolean found = false;
 int parseType = parser.getEventType();
 while (parser.hasNext()) {
     parseType = parser.next();
     if (parseType == XMLStreamReader.CHARACTERS) {
             continue;
     }
     String elName = parser.getLocalName();
        if (parseType == XMLStreamReader.START_ELEMENT) {
  if (elName.equals(elementName)) {
         found = true;
      break;
  } else if (elName.equals(stopElementName)) {
      // in the case where we are looking across parallel elements
      // or into a container element,
      // stop when we find the stop element
     found = false;
    break;
  }
         } else if (parseType == XMLStreamReader.END_ELEMENT
         && elName.equals(stopElementName)) {
  // in the case where we are looking within a container element, stop
  // when we reach the end of that container element
  found = false;
  break;
     }
 }
 return found;
}

I typically use skipTo() to move to the next instance of an element, before it's containing element end tag is reached. For example, when I'm parsing the contents of a Trackpoint tag:

<Trackpoint>
   <Time>2010-11-04T22:03:00Z</Time>
   <Position>
   <LatitudeDegrees>47.5834731</LatitudeDegrees>
   <LongitudeDegrees>-122.2491668</LongitudeDegrees>
   </Position>
   <AltitudeMeters>17.4411621</AltitudeMeters>
   <DistanceMeters>26.8590393</DistanceMeters>
   <HeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
   <Value>85</Value>
   </HeartRateBpm>
   <SensorState>Absent</SensorState>
</Trackpoint>


This is the code to parse that data:

/**
 * parses a single trackpoint and writes an output line to the trackDetail CSV
 * writer.
 * 
 * @param parser
 * @throws Exception
 */
protected void parseTrackPoint(XMLStreamReader parser, String lapId,
    String trackPointId) throws Exception {
 trackDetailWriter.writeArg(lapId);
 trackDetailWriter.writeArg(trackPointId);
 skipTo(parser, TIME, TRACKPOINT);
 trackDetailWriter.writeArg(getTimeValue(parser, parser.next()));
 skipTo(parser, LAT, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, LONG, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, ALT, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, DIST, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, HEARTRATE, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 trackDetailWriter.flushArgs();
}

With skipTo in place, I needed to extract the data from the XML. The text inside of elements is CHARACTER data, and is accessed by calling parser.next() after hitting the enclosing tag START_ELEMENT. The CHARACTER data is accessed via XMLStreamParser.getText():

/**
 * extracts a double value from a character stream.
 * 
 * @param parser
 * @param parseType
 * @return the double, or -1 if the element is not CHARACTERS. will also
 *         thrown runtime exception if parsing fails.
 */
private double getValue(XMLStreamReader parser, int parseType) {
 if (parseType == XMLStreamConstants.CHARACTERS) {
  return Double.parseDouble(parser.getText());
 } else {
  return -1;
 }
}


The combination of skipTo and getValue() allows me to extract Double values from the XML. I'm using Double to validate the format of the value, even though I'm going to persist that value back as a string. When extracting timestamps, I extract the data to a long:
private long getTimeValue(XMLStreamReader parser, int parseType)
    throws ParseException {
 if (parseType == XMLStreamConstants.CHARACTERS) {
  String raw = parser.getText();
  String date = raw.substring(0, raw.indexOf('T'));
  String time = raw.substring(raw.indexOf('T') + 1, raw.indexOf('Z'));
  SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy-HH:mm:ss");
  Date actual = sdf.parse(date + '-' + time);
  return actual.getTime();
 } else {
  return -1;
 }
}

Separating Reading XML from Writing CSV
In the code above there are calls to a trackDetailWriter object. I chose to separate the writing of the CSV from the reading of the XML in order to test the XML reading logic more easily. This simplified things a lot, it allowed me to pass in CSVWriter objects, it relieved the parsing code of having to manage/open/close destination files, and it allowed me to write test implementations of CSVWriter that stored the data in memory for me to check during unit tests.

public interface CSVWriter {

 /**
  * write a single arg
  * @param arg
  * @throws Exception
  */
 public void writeArg(Object arg) throws Exception;
 
 /**
  * flush all pending args as a single CSV line
  * @throws Exception 
  */
 public void flushArgs() throws Exception;
 
}

The default implementation (used at runtime) looks like this:

/**
 * 
 * @author Arun Jacob
 *
 * push comma separated values to a file. 
 */
public class DefaultCSVWriterImpl implements CSVWriter {

 private FileWriter writer;
 private StringBuffer buffer;
 
 public DefaultCSVWriterImpl(String fileName) throws IOException {
  writer = new FileWriter(fileName);
  buffer = new StringBuffer();
 }
  
 /**
  * close the file: REQUIRED for all file writers.
  * @throws Exception
  */
 public void close() throws Exception {
  writer.flush();
  writer.close();
 }

 @Override
 public void writeArg(Object arg) throws Exception {
  buffer.append(arg);
  buffer.append(",");
 }

 @Override
 public void flushArgs() throws Exception {
  writeToFile(buffer);
 }

 /**
  * flush the contents of the buffer to file
  * @param buffer
  * @throws IOException
  */
 private void writeToFile(StringBuffer buffer) throws IOException {
  if(buffer.charAt(buffer.length()-1) == ',') {
   // remove the last comma before writing. 
   writer.write(buffer.toString().substring(0,buffer.length()-1));
  } else {
   writer.write(buffer.toString());
  }
  
  resetBuffer(buffer);
 }

 /**
  * clear out the StringBuffer
  * @param buffer
  */
 private void resetBuffer(StringBuffer buffer) {
  if(buffer.length() > 0) {
   buffer.delete(0,buffer.length());
  }
 }

}

The test implementation (used to verify that values are being pulled from the XML correctly) looks like this:

public class TestTrackPointCSVWriter implements CSVWriter {
 
 static final String TRACKPOINTID = "TrackPointId";

 static final String LAPID = "LapId";

 static final String ACTIVITYID = "activityId";

 List<Object> args;
 Map<String,Object> argsMap;
 public TestTrackPointCSVWriter() {
  args = new ArrayList<Object>();
  argsMap = new HashMap<String,Object>();
 }
 

 @Override
 public void writeArg(Object arg) throws Exception {
  args.add(arg);
  
 }

 @Override
 public void flushArgs() throws Exception {
  argsMap.put(LAPID, args.get(0));
  argsMap.put(TRACKPOINTID, args.get(1));
  argsMap.put(TCXPullParser.TIME, args.get(2));
  argsMap.put(TCXPullParser.LAT,args.get(3));
  argsMap.put(TCXPullParser.LONG, args.get(4));
  argsMap.put(TCXPullParser.ALT, args.get(5));
  argsMap.put(TCXPullParser.DIST, args.get(6));
  argsMap.put(TCXPullParser.HEARTRATE, args.get(7));
  args.clear();
  
 }

 
 /**
  * validation method
  * @param key
  * @return
  */
 public Object get(String key) {
  return argsMap.get(key);
 }

}

Conclusion
I'm not sure what I was thinking when I wrote the original SAX parser for TCX data, other than I just like to write in Ruby. The additional context that I get by being able to pull tags instead of getting them pushed at me makes the code much easier to follow and therefore maintain. 

Friday, September 24, 2010

Datamining my GPS/HRM Data, Step 2: Pig for ETL

Overview
Now that I've extracted 3 years worth of GPS/HRM data into CSV format, I want to get some basic summary information. Specifically, I want
  • the total distance covered for running, biking and 'other' (usually skate skiing) per month. 
  • average run and ride mileage per month
  • average running and cycling pace per month
Pretty no brainer stuff that I could probably write a quick 'n dirty ruby script to do for my paltry 40MB of data. 

But...I want to use Pig. While this may seem like a candidate for the "Cutting Butter With A Chainsaw" award, I'm actually trying to show that Pig Scripting is pretty damn useful for ETL. The nice thing about Pig is that when I'm dealing with 40GB or TB of input data, the same script can be run across a multi node cluster without changes to the basic logic.  

I'm also going to try and capture what I've learned from being in the trenches with Pig over the last couple of months. We use it at work to process TBs of log data, and while it definitely has it's warty aspects, I find that it is well suited for basic summary, grouping, and filtering operations. 

Pig Setup
I downloaded and setup as described here. Pig assumes that you've got Hadoop installed, and specifically DFS started, as well.  I start DFS from the bin directory of my hadoop install: start-dfs.sh.
I'm running Pig in psuedo-local mode (with my local box set up as a single node hadoop cluster):

pig -fpigfile.pig

I symlinked the pig shell script to /usr/bin/pig for ease of use. I have HADOOP_HOME and JAVA_HOME defined, the pig shell script uses both, and assumes that it is located in a bin directory in a standard pig install. 

Pig Latin, UDFs, and Tuples
Pig scripts are written in Pig Latin. I'm going to cover a small subset of Pig Latin in this script, see full references part I and II for comprehensive overviews. Where Pig Latin falls short, Pig has User Defined Functions that allow loading, storing, and transforming data. UDFs are written in java and extend defined interfaces. 

Pig Latin and UDFs manipulate Tuples to generate sets of Tuples. Tuples are groups of 1..N values, where a value can be an int, a float, a string, another tuple, or a bag (set) of values. In Pig, you see a lot of 
   X = {do something in pig on} Y;

where Y is an original set of Tuples and X is the transformed result. Transformations of Data are done as Map-Reduce jobs. The complexity of what you are transforming determines how many map-reduce jobs are run.

The Goods

Load the data.


I'm loading the detail files generated from the XML->csv transform I did last time. I loaded all detail files into pig using the default PigStorage() UDF. In order to use this UDF, I had to specify the format of all fields in each row:
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

This gives me a tuple A that contains data as specified above. Note that this tuple represents all of the rows of loaded data. Also note that I used a wildcard to load all files in at the same time. PigStorage() also allows the user to separate locations by comma, which is really great when loading from multiple locations.

Note that the locations above are in HDFS.  I uploaded all of my data into HDFS using the hadoop fs -put command.

Extract the month from the timestamp using a custom UDF.


Right now I've got the timestamp formatted as a character array. I'm going to need to extract month data and add it to the list of tuple fields in order to get monthly averages. This is where the simple functionality of the pig script is supplanted by a UDF.

UDFs, as mentioned before, are used to load, store, and transform/extract data. I'm going to write one that extracts the month as an integer between 1-12  from the time string. Note in the listing below I broke out the actual formatting into a separate function that I could unit test w/o having to create a Tuple instance.


package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class GetMonth extends EvalFunc package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple; 
public class GetMonth extends EvalFunc (Integer) { 
// parens represent brackets, this is a templated class 
private static final int TIME_INDEX = 2;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);
      if(str != null) {
        return DateUtils.dateFromFormattedTime(str,Calendar.WEEK_OF_MONTH)
      } else { 
        return -1;
      }
      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

  private static final int TIME_INDEX = 0;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);

      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

I also created a UDF to generate the week of the month as well. I'm storing both the month and the week of the month as additional columns.

Load the UDFs.

In order for my script to see the UDFs I have just written, I need to load the jar that the UDFs reside in, and alias the methods so that they can be called in the script:

This is done by using the REGISTER keyword at the top of the script to load the jar, and the DEFINE keywords to alias UDFs. Note that when I define the UDF, I'm specifying it's constructor. In the cases above, GetMonth and GetWeekOfMonth both have default (no parameter) constructors.

REGISTER foo.jar;
    DEFINE extract_month com.infovoracious.udfs.GetMonth();
    DEFINE extract_week com.infovoracious.udfs.GetWeekOfMonth();

I invoke the methods like this:

-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;


Sort by month and week of month.
Now that we have the columns, we can sort by them. Sorting by month, week of month, groups values by unique month,week of month tuples:

C = GROUP B BY (month,week);
This creates a tuple that looks like this:
{month, week}, array length 1..N of {activityId,lapId,time,latitude,longitude,alt,dist,hr}
actual values would look like this:

{1,1},[{..},{..},..]
{1,2},[{..}...]
...

Summarize data.
With columns now grouped, I can summarize data from the grouped tuples. I want to summarize mileage per week. I can do that as follows (showing the whole script for continuity)


-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

C = GROUP B BY (month,week);

-- the tuple now looks like group(month,week) B(tuple contents), 
-- so I reference dist as a member of the B tuple.

D = FOREACH C GENERATE *, SUM(B.dist) as total_dist;


Flatten the grouping columns.
Using the FLATTEN keyword to remove tuple nesting has different effects depending on where you use it. If you use FLATTEN to flatten the grouped columns, it merely removes the bag from around the grouped columns:
{month, week}, [array of tuples]
becomes
month, week, [array of tuples].
Flattening out the array generates a new row for each array element:
1,2,[{a,b},{c,d}]
becomes
1,2 ,a,b
and
1,2,c,d
In this case I want to flatten out the grouping columns:

-- the load 
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

-- grouping
C = GROUP B BY (month,week);

-- summing (and discarding what I don't need)
D = FOREACH C GENERATE SUM(dist) as total_dist;

-- flattening:
-- note how I'm referring to the grouping columns by the 'group' special keyword.
-- note how total_dist is not scoped by an enclosing tuple b/c of the way it was generated above.
E = FOREACH D GENERATE FLATTEN(group),total_dist;

Store it.
Finally, I want to store the results of my work. This is done using the default PigStorage() store UDF:

STORE E into '$some_dir';

Note that I didn't need to specify 'using PigStorage()'. I also used a variable, which I pass into the pig script as a key-value pair, like this:

pig -p some_dir=foo...

And that's it! I've summed my mileage from each week per month over the last three years: in six lines of script!

Conclusion
Pig is a powerful tool to clean up and perform basic operations on data.  When you know what you need to do, and have to do it on a lot of data, it works remarkably well. That said, there are a couple of things that start to hurt over time:
  1. When you write a lot of Pig, you start wanting to re-use sections of script, i.e. make them functions. Except you cant. Yet. So you end up either cutting and pasting (bad), or saving variables off for later use (confusing when you try to use them later) instead of writing script based UDFs (preferred). 
  2. There is no conditional execution of an operation. I can't test a variable state and then execute one or another statements based on that variable. What this means in real world use is that I do all of my conditional testing in bash, prior to invoking the pig script, and build up variables based on the results of those conditional tests. Those variables then get accessed by operations.  
  3. The default pig serialization format is great for the data I was using above because it had no commas which are the default Pig serialization format delimiter. You can change the delimiter, but that begs the question of what happens when someone has injected your delimiter into the data you are trying to process. 
  4. Higher level workflow is not solved with Pig. We started out running pig via a set of cron jobs, which soon turned into an admin's worst nightmare. Besides just being hard to maintain, we were also not taking advantage of the actual sources and destinations of pig data to schedule the work. In other words, if Pig job A produced intermediate format B and then continued running, we had no way of starting Pig Job C that loaded format B until Pig job A completed.  We are currently evaluating oozie at work to see if it can improve on cron+bash (a low bar, I know).
Warts and all, Pig sure beats writing map reduces for simple grouping/aggregation functionality. We still write map reduces when we are doing more complex operations, and our ETL job flows end up looking like a combination of pig, raw map-reduce, and HDFS access. But Pig really allows us to get away from a lot of boilerplate work (and maintenance!) for processing the data. Which gives us more time to analyze it.

Next
Of course, just getting summary statistics on my data hasn't really scratched the itch that started me off in the first place. While I now have an idea of my general fitness month to month, I still don't know enough about the amount of hard versus easy running/cycling I did, or how fast I was going when I was going hard or easy. Answering those questions will help me determine my fitness, which I previously defined as heart rate vs pace, accounting for terrain.  In order to get a metric for that definition of fitness, I need to define of what 'hard' and 'easy' really are for me, and analyze how specific values of those workouts have changed for the better or worse over time. 

Tuesday, September 7, 2010

Data Mining My GPS/HRM Data: Step 1, Formatting the Data

I've been wanting to analyze the data from my Garmin 305 for a while now. I've been a casual runner/biker/hardcore data geek for a while now, and last year I started doing triathlons, which means even more data to analyze. While I've always been curious, I just haven't had a great 'need' to analyze it until now...

I'm switching from a primarily heart rate based training program to a 'pace based' training program. The former had me training within specific heart rate zones, the latter has me running at specific pace ranges. I'm a 'measurer', and I'm curious to see how effective (or not!) the pace based training program is.  Fortunately I have one device that tracks both heart rate and speed, and I can analyze the effect that the pace based training has relative to the effect that the heart rate based training has on my overall fitness.

In this case I'm measuring fitness as a combination of heart rate vs terrain covered, i.e. hilly vs flat, vs pace. In order to measure my fitness up to now and going forward, I need to answer the following questions:
  1. how much time did I spend in 'recovery' mode, where my heart rate was < 70% max
  2. how much time did I spend in 'pain cave' mode, where my heart rate was > 85-90% max?
  3. how much faster (or slower) did I get at the same heart rate over the last year? 
  4. for the new pace based program, how much time am  I spending at the different paces, i.e. recovery pace, base pace, marathon pace, 1/2 marathon pace, 10k pace, 5k pace, 1 mile pace? 
  5. what is my average heart rate for those paces? 
  6. how much faster (slower) am I getting? 
I'm hoping to answer these questions using several approaches and several technologies that I've been using at work, and others that I've been itching to try. 

The first thing I needed to do prior to doing any analysis was to format the data into a format that I could easily operate on. The data is exported from the device into a format called tcx, which is a schema-validated XML. 

I need the data in csv format, mainly because the tools I want to process the data with are all hadoop based, and while I've read that it is not only possible, but easy, to process XML with hadoop,  hadoop works best with csv formats. XML is a nice format for nested data, and this is nested data, with the following structure
  • activities
    • activity
      • laps
        • lap -- contains summary averages from trackpoint data (see below)
          • trackpoints
            • trackpoint -- contains snapshot heart rate, altitude, distance, etc.
XML is especially good when there are optional attributes. CSV tends to suck with optional attributes, because nothing is optional. In this case none of the attributes are optional, so ultimately XML is overkill for storing this data.

I collapsed this structure into two csv lists: summary data and detail data, because I planned to act on summary and detail data separately. 

The summary data contains lap summary data:
  • activity id, lap id, total time, total distance, max speed, max heart rate, average heart rate, calories, number of trackpoints.
The detail data contains trackpoint data:
  • lap id,time, latitude, longitude, altitude, distance, heart rate
To do the conversion, I used the ruby libxml Sax parser. In order to use the libxml sax parser, I needed to create a callback handler that implemented the methods I wanted to override.


class PostCallbacks
  include XML::SaxParser::Callbacks

  def on_start_element_ns(element, attributes, prefix, uri, namespaces)
  ...
  end


  def on_characters(chars)
  ...
  end


  def on_end_element_ns (name, prefix, uri)
  ...
  end
end

In the callback handler, I maintained state to track nested XML objects. Typically I would assign state in the on_start_element_ns() method, act on that state in the on_characters() method, and release state in the on_end_elemebt_ns() method. I would also flush my results to disk occasionally to avoid taking up an unreasonable amount of memory.

I had about 40 Meg of data from the previous 3 years, which was parsed into csv files in approximately 44 seconds. I'm more than happy with that performance right now, because this is essentially a one-off job to get the data.

Next Up: setting up a data processing pipeline using Pig.

Friday, August 27, 2010

Beware the Emerging God Object

Most God Objects are pretty obvious. Bloated, side effect filled, their existence guaranteed by collective fear about refactoring them to a state where their previous functionality is not reproducible.

The God Object that derailed me for the last couple of days was not bloated, it's functions (for the most part) did not have side effects. It did a lot, but did not seem to be overwrought.  In fact, I'm pretty sure it only had a minor God Complex until I started refactoring it!

As a software engineer, my goal is to produce the most effective, robust software with the least amount of effort. So, in an attempt to not repeat last week's thrash-fest, I'm going to catalog the warning signs, the resolution, and the corrected approach, because I'm pretty sure I'm going to be in this situation again.

The Task:
We need to make an existing protoype table driven. The prototype functionality configured and ran A/B tests. Its configuration data was currently hardcoded into an enum.

Warning Signs Before We Even Started:
  1. The core logic of the A/B test configuration  resided in a single object.
  2. Despite having several distinct (logical) sub components, this object was responsible for serializing itself and those sub objects.
  3. This object existed in both our configuration time and runtime environments, which have significant differences, the biggest one being that at configuration time, all objects are read/written to a relational database, and at runtime, they are read from a BDB store. Most conceptual objects in the system have config time and runtime implementations.
  4. We could not get rid of this object because it was still being used in production. We would have to implement a phased approach to upgrading it. 
  5. I did not know the codebase well. 
  6. Our sprints are a week, so we felt compelled to fit in an end to end task into a week. 
The first four warning signs are simply facts. The last two are things that I should pay more attention to. If I don't know the codebase well, and I'm trying to make significant improvements in a week, my chances of success go way down. I chose to ignore that reality.

Warning Signs Once We Started:

  1. The table driven logic did not mesh well with the existing logic. The developer who had written the code had serialized all sub objects into a JSON string. The object was then serialized into a BDB file as a set of attributes and the JSON string, not an object graph. Trying to work with the new table data (an object graph) mean that:
  2. Any change I made to the code had side effects. This is because I was trying to preserve the original signature of the object to maintain backward compatibility with the rest of the system. This became obvious when:
  3. The original code, fairly straightforward, became conditional-heavy. Instead of acknowledging the additional complexity, 
  4. I was choosing to 'hack' more because I knew that we would have time to rework the code. I knew I was hacking because
  5. The developer I was working with started to get more confused with every checkin I made. 

Do Over!
When his brow started cramping up from being so furrowed, I realized that things were wrong. We grabbed a whiteboard and went to work, depicting the original workflow, how adding table driven configurations changed that workflow, and how to change the existing workflow as little as possible to reduce the amount of rework that we would have to do in the runtime portion of the code.  Most of the good ideas came from our dev lead, who had been silently (and then not so silently) observing us trying to make sense of it all. I mention that because he was operating at a distance from a lot of the complexity, and his solution avoided most of it. I felt that he out of all of us was able to make the necessary leap out of the stew of constraints and details, and reframe the problem in a way that made it a much less complex problem. That's the genius of software. That's what I do, but only sometimes. That's why I'm writing this down.

The Problems and Their Solutions: 

  1. The old functionality contained both runtime behavior and configuration and behavior. We chose to separate the two, resulting in a POJO configuration data object used at configuration time, and the 'action' object that knew how to initialize given one of those data objects and contained runtime behavior.
  2. More separation of concerns: The old way of storing data in a JSON string  and the new table driven way of storing data did not belong together in the same object.  Even though we separated the old object into a data object and an 'action' object, we chose to keep the old data separate from the new data, and access both through getters/setters, which insulated us from the underlying implementations. 
  3. Composition: because we didn't have enough time to rewrite the infrastructure that wrote the configuration object into a BDB, we needed the new data to be persisted into  configuration object as part of the JSON string. Fortunately, once we separated out the data from the configuration object, we could compose the JSON string outside of the configuration object using the POJOs, then set it prior to serializing the configuration object to the BDB file. 

Conclusions:
The warning signs before start played out into more significant problems before they were fully resolved. And, when I think back on the process, I realize I was uncomfortable with the changes I was making, but I chose to push on. Because time was relatively compressed, I found myself making 'poor choices', which ultimately backfired. Next time (these are notes to myself, the 'you' I'm taking to is 'me' :)

  1. Acknowledge risks up front. Not a lot of time to get it done? That's a risk. Don't know the codebase? That's another risk (assuming the codebase is non trivial). 
  2. Examine the old code and determine if there are concerns that should be separated before anything else is done. 
  3. Determine the scope of those changes to see if they can be met in the desired time frame.
  4. If not, break the project into smaller chunks. 
  5. If it feels easy it's right. If it feels hard, it's less right. Be more right. 
  6. Do not hack. When you find yourself hacking, stop. Back up and revisit initial assumptions. Step away from the problem.
  7. Discuss ideas often, with others. If someone else can understand it easily, it can't be that bad. If, on the other hand, their brow starts to wrinkle, and your blood pressure starts to go up because there are not words in the English language to describe what you need to describe, it might just be worse than you think. 
More Conclusions: 

  1. God objects never started out as God Objects. They started out as high level concepts -- "oh, I need something that does X", and mutate because of bolted on additional functionality. 
  2. Most developers don't set out to create a God Object. In this specific case I was trying to save effort by reducing the amount of change to the original code. However, the additional effort to explain and understand my 'bolt ons' was costing real time and effort. 
  3. The earlier you de-factor complexity out of a God Object, the better. Decomposing layered functionality into discrete objects makes it really easy for other people to recognize what those objects are for, and not overlay them with functionality that addresses other concerns. 
  4. Conversely, the longer you wait, the harder it is. God Objects tend to have code with lots of side-effects and implicit assumptions. Refactoring those successfully is hard, even with comprehensive unit tests. 



Wednesday, April 21, 2010

Synchronization Redux

In my last post, I discussed how I had refactored the synchronization around some data structures to reduce the need to lock those data structures except when necessary. My solution was cleaner in that the synchronization was localized, but the actual synchronization was still a little bit hairy in that it required a recheck inside the lock:

int newVersion = specialFooCache.getCacheVersion();

 // we only want one thread to refresh the cache, and we
 // want the other ones to keep using the old data structures.

 if(newVersion != cacheVersion && inProgress == false) {
  synchronized(lock) {
   // first thread is in b/c cacheVersion has not been updated yet.    

   // All other threads    evaluate to false.
   int checkVersion = flexibleStrategyCache.getCacheVersion();
   if(checkVersion != cacheVersion) {

   }
  }
 }

But I was (a) stuck and (b) had other code to refactor under the gun. So I didn't think about it much until today in the code review. I'm really glad I code reviewed with John, because he is a relentless simplifier whose lives by the 'less code' motto.

Right away I could tell that the whole recheck thing wasn't sitting well with him. The more I explained, the more concerned he looked, until he spotted a potential race condition between threads that might have piled up outside the synchronize when the someCache had incremented while it was being reloaded. These threads would immediately reload even when they didn't have to, which didn't affect the integrity of the in memory data structures, but was definitely a bad use of cpu. He also was insistent that there had to be an easier way, and under his direction this is that easier way:

 int newVersion = 0;
 // lock is shorter
 synchronized(lock) {
  newVersion = specialFooCache.getCacheVersion();
 
  // leave right away if in progress
  if(cacheVersion == newVersion || inProgress == true) {
   return;
 }
 
  inProgress = true;
 }
 
 
 try {
  // load from cache
 
 } catch (Exception e) {
 ...
 } finally {
  synchronized(lock) {
   // set the version.
   cacheVersion = newVersion;
   // we are done here.
   inProgress = false;
   
  }
 }

A couple of things to note:
(1) This solution is definitely easier and more performant in addition to being correct. The lock is not held during the reload, and when it is released other threads will exit the function immediately if a cache rewrite is in progress.
(2) The key thing that I learned from watching John solve the problem is that he wasn't happy until it was dead simple. When I don't take this approach, or abandon it under pressure, there are some potentially costly ramifications. The original solution had a complexity smell around it that I chose to ignore. The problem with ignoring that smell is that it comes back to haunt after you've forgotten why you wrote it in the first place. The solution is, of course, to write code that is simple enough to re-understand months from now. Or simple enough for anyone not familiar with your code to understand.
(3) the lock at the bottom is not actually necessary since (a) the variables being set are volatile and (b) only one thread will get this far. However, since only one thread is getting this far, it is not a performance issue, and actually lends some clarity to the resetting of test conditions.
(4) I still feel like this is somewhat a work in progress...I'm not convinced that (3) is really required (even though we both signed off on it). But I do feel a lot better about the current algorithm. That said,
(5) I love code reviews. The knowledge transfer is huge. I'm always looking for ways to strip my code down, and the insight I get from a good reviewer (like John) raises my game tremendously.

Thursday, April 15, 2010

Synchronization Requirements

Recently I was lucky enough to pull the short straw and refactor the dreaded FooManager (name changed to protect the innocent). FooManager was a horrendously complex, overwrought class that suffered from at least two God Complexes, and my team had been playing hot potato with it for the last couple of months, since we learned we had inherited it.

My mission was to make FooManager understandable/maintainable without requiring heavy ingestion of psychotropic substances or becoming suicidal. After doing the usual de-Godification and Cleverectomy procedures, i.e. choosing decent abstractions, removing static methods, etc, I was left with a fundamental synchronization dilemma.  It was far from the most annoying part of FooManager, but it was definitely the most interesting to refactor.

FooManager received updates for each specific Foo subclass from different caches. It registered for those updates using a callback interface. The cache notified FooManager whenever it changed. This seemed like an innocent enough pub/sub pattern. But it made managing the datastructures in FooManager -- the ones that held pre-computed indexes of Foos by name, Foos by id, Foos by other id, etc -- tricky.

All access to these data structures were done within read/write locks. I think the motivation behind using a read/write lock instead of a more generic synchronization mechanism was to optimize reads at the expense of writes. This made sense, given that writes were infrequent and driven by periodic cache changes, and reads did not need immediate access to new data. But it meant that any time you wanted to access the structures, you did so within the confines of a read/write lock. Even when all you wanted to do was read the data, you needed to acquire the read lock on it.

The event-callback and the read/write lock had a Design-Pattern-as-a-Hammer smell to it, and as I thought about it some more, I realized why.

(1) The data structures under read/write locks did not have to be immediately consistent with the cache, they had to be eventually consistent with the cache. That meant that a cache change did not require immediate and total synchronization.
(2) The read/write lock was overkill, but necessary because of the asynchronous nature of the cache refill.
(3) The loading performance of the cache was really minimal, since the cache data structures were in memory by the time the notification occurred.
(4) What we really wanted to do was only block when a thread was updating the data structures. We didn't want other threads re-updating the data structures. But they shouldn't be blocked from accessing the current data structures while the update was going on.

The eventual consistency and low reload overhead of the system made it possible to do away with the event callback interface and poll the cache for an update by checking a cache version number via a synchronized method. Heres how it worked:
  1. Every request would check the cache version. 
  2. The first thread that got an updated version number would enter a lock and update a set of temporary data structures. 
  3. All other threads would block on the lock, enter once the original writer method had exited, make the version check, get the same version that they had entered with, and not attempt a reload.

synchronized protected int checkExpired(int currentVersion) throws Exception{

  int newVersion = specialFooCache.getCacheVersion();
  if(newVersion != currentVersion) {
   // update temp data structures.
   ...
   // assign temps to member data structures
   ...
  }
  return newVersion;

}

Prior to every request, I checked the cache version and reloaded if necessary:

 // cacheVersion is a member variable of FooManager
 cacheVersion = checkExpired(cacheVersion);
 // now access structure w/o locks.


This was, well, OK. It was definitely more simple, but now all reader threads were blocked while waiting for the first one to update the data structures.

Thinking back to the original conditions, I remembered that as long as the data structures were eventually updated, all non writing threads would be just as happy using the original structures in an unblocked manner. In other words, we only needed to make sure that one thread reloaded the original structures.

This required synchronization at a more granular level: I removed synchronization on the checkExpired() method, and inside of it, blocked the rewrite code using a conditional statement followed by a lock, and allowed all threads that had gotten past the conditional and stuck on the lock to exit once (a) the lock was released and (b) they realized that they didn't need to do the cache reload.

protected void checkExpired() {

 int newVersion = specialFooCache.getCacheVersion();

 // we only want one thread to refresh the cache, and we
 // want the other ones to keep using the old data structures.

 if(newVersion != cacheVersion && inProgress == false) {
  synchronized(lock) {
   // first thread is in b/c cacheVersion has not been updated yet.    

   // All other threads    evaluate to false.
   int checkVersion = specialFooCache.getCacheVersion();
   if(checkVersion != cacheVersion) {

    inProgress = true;
    // reload from cache
    .....
    // set data structures so that subsequent threads get
    // the new data
    ....
    // set the version.
    cacheVersion = newVersion;
    // we are done here.
    inProgress = false;
   } // end version recheck
  } // end lock.
 } // end version and inProgress check


Note that in order to prevent thread local caching and allow all threads to get immediate access to the data structures after they were reassigned, I declared the data structures that I was updating as volatile.

The simplification of the cache update process made it a whole lot easier to understand,  I think that the conditions -- specifically the eventual consistency -- allowed us some latitude in when reader threads actually got the data. There isn't a specific by the numbers approach to solving synchronization issues, but it always helps to understand what you are synchronizing, and more importantly,  what you don't have to synchronize.

Thursday, April 1, 2010

Using Google Maps API v3 + JQuery.

My side project to display and do some detailed data mining of my GPS exercise data, has been languishing for the better part of a year while my day job(s) have been taking most of my day and night time. Garmin has a pretty decent Mac desktop program that provides graphing, graph zooming, and stats by mile, but I'd rather have all of that functionality (and more!) via a web UI. I'd also like to integrate the results of the data mining into that UI in a useful way, i.e. mining relative effort over similar terrain to track actual fitness over time.

I decided that this project is going to be about having fun and nothing more, and as such decided to write the UI first, because all of my work these days is server side, Java, so 'fun' for me involves more dynamic languages, i.e. JavaScript and Ruby.

I wanted to display my gps data as a route on a map, which meant getting up to speed on the Google Maps API, and writing a quick dummy server that could dump out some route data for me.

I decided to use the latest Google Maps API v3 , and of course JQuery for the front end work, and mocked up a quick backend server using Sinatra. I can always redo that backend in something more robust once I want to actually deploy, but for now getting the data to the page is more important than how fast that data is retrieved, or machine resources consumed by serving that data.

Part 1: Displaying The Map


I needed to include the google maps api v3 js:

http://maps.google.com/maps/api/js?sensor=false
and the latest JQuery:
http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js

Then in the ready function, I created a map by pointing it to a div and passing in my options.

$(document).ready(function() {
    var myLatlng = new google.maps.LatLng(47.5615, -122.2168);
    var myOptions = {
          zoom: 12,
          center: myLatlng,
          mapTypeId: google.maps.MapTypeId.TERRAIN
    };
    var map = new google.maps.Map($("#map_canvas")[0], myOptions);
   
Note above that I'm assigning the map to a div with id = "map_canvas".

Part 2: Parsing the GPS data

Eventually I'd like to upload data directly from my device, but for now I'm going to skip that part and 'pretend' I've already done it. GPS data is exported in the TCX format, which is Garmin-proprietary, but is the easiest to use right now. My current desktop program has the ability to export 1 to N days worth of data into tcx.

At some point parsing tcx using a DOM based parser was going to start hurting, so I decided to use a SAX based parser from the start. My usual choice for quick n dirty XML/HTML parsing, hpricot, option was therefore not an option. I investigated nokogiri, but eventually settled on libxml, mostly because the rdoc on sax parsing was very clear, and it was much faster for sax parsing.


I mainly wanted to parse lat-long data out of the tcx file and dump the coordinates into another file in JSON format. Here is my 5 minute hacked together code:


class PostCallbacks
include XML::SaxParser::Callbacks

def initialize(write_file)
  @state="unset"
  @write_file = File.open(write_file,"w")
  @buffer = "{\"data\" : ["

end

def on_start_element_ns(element, attributes, prefix, uri, namespaces)

  if element == 'LatitudeDegrees'
   @state = "in_lat"
  elsif element == 'LongitudeDegrees'
   @state = "in_long"
  end
end

def on_characters(chars)
  if(@state=="in_lat")
   @buffer += "{\"lat\": #{chars}"
  elsif(@state == "in_long")
   @buffer += ", \"long\": #{chars}},"
  end
end

def on_end_element_ns(element,prefix,uri)

  @state="unset"
end

def on_end_document()

  @buffer = @buffer.slice(0,@buffer.length-1)
  @buffer += ("]}")
  @write_file.puts(@buffer)
  @write_file.close()
end
 

end

parser = XML::SaxParser.file(ARGV[0])
parser.callbacks = PostCallbacks.new(ARGV[1])
parser.parse


Part 3: Serving The GPS Data Up

My goal here was to basically dump that generated file as a response to an AJAX request. Sinatra is perfect for delivering quick services like this. I use Sinatra's DSL to handle a GET request as follows:

require 'rubygems'
require 'sinatra'
require 'open-uri'
require 'json'

  get '/sample_path.json' do
   content_type :json
   File.open("../output/out.json") do | file |
   file.gets
   end

  end


The ../output/out.json is where I parsed the lat-long data from the tcx file into.
I ended up spending a lot of time debugging my get method (http://localhost:7000/sample_path.json). In FF I was unable to get a response back from my GET request. I googled around and apparently there are some compatibility issues with firefox 3.6.2 and JQuery. I was however able get the code to work in Safari, and I'm considering downgrading to FF 3.5 because I haven't seen those kind of problems with that browser, and Firebug is an essential part of my debugging library.

Part 4: Drawing The Data On the Map
The JS code that made the request loads the results into google.maps.MVCArray, which it then uses to create a polyline superimposed on the map:


var url = "http://localhost:4567/sample_path.json";
$.ajax({
  type: "GET",
  url: url,
  beforeSend: function(x) {
   if(x && x.overrideMimeType) {
     x.overrideMimeType("application/json;charset=UTF-8");
  }
  },
  dataType: "json",
  success: function(data,success){

   var latLongArr = data['data'];
   var pathCoordinates = new google.maps.MVCArray();
   for(i = 0; i < latLongArr.length; i++) { 

     // each coordinate is put into a LatLng. 
     var latlng = new  google.maps.LatLng(latLongArr[i]['lat'],
        latLongArr[i]['long']); 
     pathCoordinates.insertAt(i,latlng); 
   } 
   // and this is where we actually draw it. 
   var polyOptions = { 
     path: pathCoordinates, 
     strokeColor: '#ff0000', 
     strokeOpacity: 1.0, 
     strokeWeight: 1 
   };
   
   poly = new google.maps.Polyline(polyOptions); poly.setMap(map); 
   } 
 }); 

The Result

Not super impressive, but a good start!

Tuesday, March 9, 2010

Don't Drive by Dumb Dogmatic Data

My posting has fallen off severely since taking up the new job, mostly because I have had nothing intelligent to say while I'm trying to ramp up on the technology as well as the business drivers behind the technical decisions being made.  However, something did come up today that I want to remember, and I've heard more than once that if you don't write it down, you don't remember it

I've been thinking a lot about numbers. In this brave new world, a lot of companies that are 'data driven'. Meaning they make their decisions based on the data that is around them, and if they can't measure it, they can't manage it.

That is a statement that sounds basically logical, assuming that
(1) what is being measured is clearly understood by everyone, and
(2) changes that occur to the measurement correlate well to overall system state.

In the engineering world,  people don't have a lot of patience with numbers that are not explainable. So the mythical 'sales numbers' that drive entire sales teams off of cliffs every quarter are usually sneered at by engineers, who hold themselves up as the high priests and priestesses of logic.

For engineering treams, numbers like TPS, MTTF, etc, are not only easy to conceptualize, but changes in them are good indicators of system functionality. More importantly to engineering organizations, you don't have to be a software developer to understand what a decrease in TPS or MTTF means to the business.

So engineering management is always looking for other numbers that encapsulate system health. Again, this is a perfectly reasonable goal, because good metrics serve as a useful abstraction layer around the grimy bits of the sausage factory. However, I think that the quest for engineering is one more piece of evidence that shows that how rational starting points end up being ridiculous the moment logic is abandoned in favor of dogma.
 
We've all laughed ourselves silly at the old stories of measuring programmer productivity by lines of code written, but what are the programmers of tomorrow going to laugh at? My first candidate would be measuring the quality of unit tests by the  unit test code coverage metric -- specifically what percentage of total lines of code are covered by unit tests.

These days unit test code coverage is easy to get. We get ours from a Cobertura plugin for Maven.
Code coverage is one of those measurements that initially sounds really good. If the test coverage decreases, that's bad, right? If it increases, well, good job to the developers!

Wait, not so fast. If line coverage is supposed to be an indicator of quality, that implies that just because a test causes code to be exercised, the test is good. But wait,  I can write lots of tests with zero assertions. I've verified that in very specific cases there are no NPEs, but that's about it.

If I take unit test line coverage to represent the quantity of unit tests written without looking at the number of assertions being made per test, I'm only seeing part of the picture. If unit test coverage number changes indicate that testing is or is not being done on new code, there could be lots of false positives and negatives. For example, when I add a bunch of code in a finally block, and the function I'm adding that code to is in a unit test, my line coverage goes up without me actually writing any more tests. Conversely, if I'm adding that finally block to a function that is not covered, my line coverage goes down. In either case, do  the corresponding line coverage increases and decreases actually mean anything about the quality of the tests written?

What are good indicators of unit test quality if line coverage is misleading? As someone who writes a lot of unit tests, I would venture that test quality has some correlation to assertion density, with some caveats. In other words, what and how much is being checked when a method is tested?  Assuming that the tested method that returns a value, there is at a minimum one thing to check. If the value is a structure, there is more.

In any case, assertion density usually means that verification is being taken seriously, and also that any changes to the code have to pass all assertions - or the assertions need to be changed to match the new code. Either case requires explicit validation of the contract put in place by the assertions in the unit test. Note that assertion density is only valid when measuring direct output -- if a test is verifying  data that is not a direct output of the method being tested, is it condoning code side effects? Assertion density needs to be normalized by the number of acceptable assertions, i.e. the number of things you can check in the return value, if there is a return value. The assertion density metric should score badly if data that is not explicitly related to the method output is being checked. But maybe that would be conflating the concerns of side effect free code and high quality unit testing.

Another metric in unit testing that correlates to good coverage is conditional branch coverage.  If I can assume that every block of code may contain one or more possibly nested conditional statements,  then I know that I've at least got decent coverage when a high percentage of conditional branches are covered. I dont think that branch coverage means a lot without assertion density checks, but it does mean a lot more than simple line coverage. Ironically, Cobertura provides branch coverage, but all of the QA managers I've worked with have gravitated towards line coverage as the more meaningful metric.

Ideally I would like to see a number based on assertion density and branch coverage. This number would behave well across a wide range of assertion and branch coverage input, sort of like the half your age plus seven dating metric.  That would make it meaningful, and a good measurement to drive test quality 'up and to the right'.