Wednesday, November 17, 2010

Pull Parsing with STAX

Foreward
Due to some events beyond my control, I lost the source code for parsing my Garmin TCX data into CSV format. I had not gotten around to Git-ifying my source, and also had not backed it up via JungleDisk/Mozy, so the original Ruby code that used the LibXML SAX parser is gone.

That might not necessarily be a bad thing. In SAX parsing, events happen without any surrounding context. It is up to the programmer to supply the context, and doing that in a legible, maintainable way with the SAX event driven model is a challenge. When I had discovered issues with the parsing code, fixing those issues required a lot of time to determine the actual state at the time the bug occurred. SAX parsing code has a significant maintenance penalty.

DOM parsing is much more straightforward because you navigate the structure of the XML, and implicitly get the associated context. Unfortunately, it is prohibitively expensive because it requires that the entire document get loaded into memory.

STAX parsing is a reasonable compromise between the two extremes. It streams the file (i.e. only loading into memory what it needs), while allowing the developer to navigate the structure of the XML. In other words, you have context without memory overhead.

STAX can be used to read or write XML files -- in my application, that converted .TCX files to a CSV format, I was focused on reading XML, not writing it. For reading files, STAX provides two different APIs. The cursor based API uses XMLStreamReader. The iterator based API uses XMLEventReader. The key difference between the two is that the iterator based API treats events as first class objects and allows the user to peek ahead at the next element to be fetched. This supplies the user with more context than the iterator based API. That context comes with additional resource consumption, but still much less than loading an entire DOM into memory.

Onward
In my first STAX program, I wanted to see how far I could get with the cursor based API. Specifically, given it's better performance characteristics, would it provide enough context for me to write easily maintainable code?

A Moment...
A quick side note on why (again) I'm choosing to convert an XML stream to CSV. XML is great because DTDs and Schemas provide a way to validate document integrity when there are large numbers of optional elements. In the case of TCX, there are few optional elements -- the elements that exist always contain the same kinds of data. With an unchanging format, CSV makes more sense because it is a more compact representation of a stable data set.

Implementation Details
In my re-implementation of TCX->CSV parsing code, I needed to transform a set of nested parameters into two different CSV formats. In order to explain what I needed to do, I need to go into detail about what the Garmin TCX XML looks like, and what I wanted to extract from it.

<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<TrainingCenterDatabase xmlns="http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2 http://www.garmin.com/xmlschemas/TrainingCenterDatabasev2.xsd">

  <Activities>
    <Activity Sport="Biking">
      <Id>2010-11-04T22:02:59Z</Id>
      <Lap StartTime="2010-11-04T22:02:59Z">
        <TotalTimeSeconds>1798.6000000</TotalTimeSeconds>
        <DistanceMeters>15412.0742188</DistanceMeters>
        <MaximumSpeed>12.8037510</MaximumSpeed>
        <Calories>581</Calories>
        <AverageHeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
          <Value>140</Value>
        </AverageHeartRateBpm>
        <MaximumHeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
          <Value>158</Value>
        </MaximumHeartRateBpm>
        <Intensity>Active</Intensity>
        <Cadence>0</Cadence>
        <TriggerMethod>Manual</TriggerMethod>
        <Track>
          <Trackpoint>
            <Time>2010-11-04T22:03:00Z</Time>
            <Position>
              <LatitudeDegrees>47.5834731</LatitudeDegrees>
              <LongitudeDegrees>-122.2491668</LongitudeDegrees>
            </Position>
            <AltitudeMeters>17.4411621</AltitudeMeters>
            <DistanceMeters>26.8590393</DistanceMeters>
            <HeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
              <Value>85</Value>
            </HeartRateBpm>
            <SensorState>Absent</SensorState>
          </Trackpoint>
          ...
        </Track>
     </Lap>
     ...
   </Activity>
     ...
 <TrainingCenterDatabase>


I want to extract two kinds of data out of the XML stream above:
  1. Lap summary data. Lap summary data is good for high level comparisons of effort. The basic components of Lap summary data of the same general duration can be compared across laps.
  2. Trackpoint data. Trackpoint data -- elevation, heart rate, lat/long can be analyzed/transformed across arbitrary intervals to measure input effort and output speed.
Lap Summary Data will look like this:
activity_id,lap_id,total_time,total_distance,max_speed,total_calories,average_heartrate,max_heartrate
Trackpoint detail data will look like this:
lap_id,trackpoint_id,time, latitude,longitude,altitude,distance,heartrate

Both files may end up being used when correlating track points to their parent laps and activities.

Initializing the SAX Parser
I've created a class TCXPullParser to parse Garmin TCX data. In the constructor I initialize the STAX parser:

XMLStreamReader parser;
         .....
         /**
  * ctor, with all file names to write to and read from.
  * 
  * @param lapSummaryWriter
  * @param trackDetailWriter
  * @param fileToParse
  * @throws IOException
  * @throws XMLStreamException
  */
 public TCXPullParser(CSVWriter lapSummaryWriter, CSVWriter trackDetailWriter,
     String fileToParse) throws IOException, XMLStreamException {
  
  FileInputStream fis = new FileInputStream(fileToParse);
  XMLInputFactory factory = XMLInputFactory.newInstance();
  parser = factory.createXMLStreamReader(fis);
                ...
 }

Initialization is simple, the parser is created from the XMLInputFactory, and takes the FileInputStream created on the input file name. From this point my primary access to the file is through the parser object. I use the parser object to advance the cursor (by calling next()), inspect the type of element (as a return value from parser.next()), and grab text (parser.getText()). These three methods, with some additional functionality I've added, give me enough context to actually top-down parse the XML. 

Pull Parsing
One advantage of using pull parsing is that the code is in charge of when events are fired. This lets us do things like skip processing/move directly to an element that we are interested in by using the parser.next() method and checking the returned element type:

/**
 * skips parser to the start element of the specified element name, while
 * stopElementName has not been encountered.
 * 
 * @param parser
 * @param elementName
 * @param stopElementName
 *          if we get this far, we've gone too far.
 * @return true if element is found
 * @throws Exception
 */
protected boolean skipTo(XMLStreamReader parser, String elementName,
    String stopElementName) throws Exception {
 boolean found = false;
 int parseType = parser.getEventType();
 while (parser.hasNext()) {
     parseType = parser.next();
     if (parseType == XMLStreamReader.CHARACTERS) {
             continue;
     }
     String elName = parser.getLocalName();
        if (parseType == XMLStreamReader.START_ELEMENT) {
  if (elName.equals(elementName)) {
         found = true;
      break;
  } else if (elName.equals(stopElementName)) {
      // in the case where we are looking across parallel elements
      // or into a container element,
      // stop when we find the stop element
     found = false;
    break;
  }
         } else if (parseType == XMLStreamReader.END_ELEMENT
         && elName.equals(stopElementName)) {
  // in the case where we are looking within a container element, stop
  // when we reach the end of that container element
  found = false;
  break;
     }
 }
 return found;
}

I typically use skipTo() to move to the next instance of an element, before it's containing element end tag is reached. For example, when I'm parsing the contents of a Trackpoint tag:

<Trackpoint>
   <Time>2010-11-04T22:03:00Z</Time>
   <Position>
   <LatitudeDegrees>47.5834731</LatitudeDegrees>
   <LongitudeDegrees>-122.2491668</LongitudeDegrees>
   </Position>
   <AltitudeMeters>17.4411621</AltitudeMeters>
   <DistanceMeters>26.8590393</DistanceMeters>
   <HeartRateBpm xsi:type="HeartRateInBeatsPerMinute_t">
   <Value>85</Value>
   </HeartRateBpm>
   <SensorState>Absent</SensorState>
</Trackpoint>


This is the code to parse that data:

/**
 * parses a single trackpoint and writes an output line to the trackDetail CSV
 * writer.
 * 
 * @param parser
 * @throws Exception
 */
protected void parseTrackPoint(XMLStreamReader parser, String lapId,
    String trackPointId) throws Exception {
 trackDetailWriter.writeArg(lapId);
 trackDetailWriter.writeArg(trackPointId);
 skipTo(parser, TIME, TRACKPOINT);
 trackDetailWriter.writeArg(getTimeValue(parser, parser.next()));
 skipTo(parser, LAT, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, LONG, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, ALT, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, DIST, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 skipTo(parser, HEARTRATE, TRACKPOINT);
 trackDetailWriter.writeArg(getValue(parser, parser.next()));
 trackDetailWriter.flushArgs();
}

With skipTo in place, I needed to extract the data from the XML. The text inside of elements is CHARACTER data, and is accessed by calling parser.next() after hitting the enclosing tag START_ELEMENT. The CHARACTER data is accessed via XMLStreamParser.getText():

/**
 * extracts a double value from a character stream.
 * 
 * @param parser
 * @param parseType
 * @return the double, or -1 if the element is not CHARACTERS. will also
 *         thrown runtime exception if parsing fails.
 */
private double getValue(XMLStreamReader parser, int parseType) {
 if (parseType == XMLStreamConstants.CHARACTERS) {
  return Double.parseDouble(parser.getText());
 } else {
  return -1;
 }
}


The combination of skipTo and getValue() allows me to extract Double values from the XML. I'm using Double to validate the format of the value, even though I'm going to persist that value back as a string. When extracting timestamps, I extract the data to a long:
private long getTimeValue(XMLStreamReader parser, int parseType)
    throws ParseException {
 if (parseType == XMLStreamConstants.CHARACTERS) {
  String raw = parser.getText();
  String date = raw.substring(0, raw.indexOf('T'));
  String time = raw.substring(raw.indexOf('T') + 1, raw.indexOf('Z'));
  SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy-HH:mm:ss");
  Date actual = sdf.parse(date + '-' + time);
  return actual.getTime();
 } else {
  return -1;
 }
}

Separating Reading XML from Writing CSV
In the code above there are calls to a trackDetailWriter object. I chose to separate the writing of the CSV from the reading of the XML in order to test the XML reading logic more easily. This simplified things a lot, it allowed me to pass in CSVWriter objects, it relieved the parsing code of having to manage/open/close destination files, and it allowed me to write test implementations of CSVWriter that stored the data in memory for me to check during unit tests.

public interface CSVWriter {

 /**
  * write a single arg
  * @param arg
  * @throws Exception
  */
 public void writeArg(Object arg) throws Exception;
 
 /**
  * flush all pending args as a single CSV line
  * @throws Exception 
  */
 public void flushArgs() throws Exception;
 
}

The default implementation (used at runtime) looks like this:

/**
 * 
 * @author Arun Jacob
 *
 * push comma separated values to a file. 
 */
public class DefaultCSVWriterImpl implements CSVWriter {

 private FileWriter writer;
 private StringBuffer buffer;
 
 public DefaultCSVWriterImpl(String fileName) throws IOException {
  writer = new FileWriter(fileName);
  buffer = new StringBuffer();
 }
  
 /**
  * close the file: REQUIRED for all file writers.
  * @throws Exception
  */
 public void close() throws Exception {
  writer.flush();
  writer.close();
 }

 @Override
 public void writeArg(Object arg) throws Exception {
  buffer.append(arg);
  buffer.append(",");
 }

 @Override
 public void flushArgs() throws Exception {
  writeToFile(buffer);
 }

 /**
  * flush the contents of the buffer to file
  * @param buffer
  * @throws IOException
  */
 private void writeToFile(StringBuffer buffer) throws IOException {
  if(buffer.charAt(buffer.length()-1) == ',') {
   // remove the last comma before writing. 
   writer.write(buffer.toString().substring(0,buffer.length()-1));
  } else {
   writer.write(buffer.toString());
  }
  
  resetBuffer(buffer);
 }

 /**
  * clear out the StringBuffer
  * @param buffer
  */
 private void resetBuffer(StringBuffer buffer) {
  if(buffer.length() > 0) {
   buffer.delete(0,buffer.length());
  }
 }

}

The test implementation (used to verify that values are being pulled from the XML correctly) looks like this:

public class TestTrackPointCSVWriter implements CSVWriter {
 
 static final String TRACKPOINTID = "TrackPointId";

 static final String LAPID = "LapId";

 static final String ACTIVITYID = "activityId";

 List<Object> args;
 Map<String,Object> argsMap;
 public TestTrackPointCSVWriter() {
  args = new ArrayList<Object>();
  argsMap = new HashMap<String,Object>();
 }
 

 @Override
 public void writeArg(Object arg) throws Exception {
  args.add(arg);
  
 }

 @Override
 public void flushArgs() throws Exception {
  argsMap.put(LAPID, args.get(0));
  argsMap.put(TRACKPOINTID, args.get(1));
  argsMap.put(TCXPullParser.TIME, args.get(2));
  argsMap.put(TCXPullParser.LAT,args.get(3));
  argsMap.put(TCXPullParser.LONG, args.get(4));
  argsMap.put(TCXPullParser.ALT, args.get(5));
  argsMap.put(TCXPullParser.DIST, args.get(6));
  argsMap.put(TCXPullParser.HEARTRATE, args.get(7));
  args.clear();
  
 }

 
 /**
  * validation method
  * @param key
  * @return
  */
 public Object get(String key) {
  return argsMap.get(key);
 }

}

Conclusion
I'm not sure what I was thinking when I wrote the original SAX parser for TCX data, other than I just like to write in Ruby. The additional context that I get by being able to pull tags instead of getting them pushed at me makes the code much easier to follow and therefore maintain. 

Friday, September 24, 2010

Datamining my GPS/HRM Data, Step 2: Pig for ETL

Overview
Now that I've extracted 3 years worth of GPS/HRM data into CSV format, I want to get some basic summary information. Specifically, I want
  • the total distance covered for running, biking and 'other' (usually skate skiing) per month. 
  • average run and ride mileage per month
  • average running and cycling pace per month
Pretty no brainer stuff that I could probably write a quick 'n dirty ruby script to do for my paltry 40MB of data. 

But...I want to use Pig. While this may seem like a candidate for the "Cutting Butter With A Chainsaw" award, I'm actually trying to show that Pig Scripting is pretty damn useful for ETL. The nice thing about Pig is that when I'm dealing with 40GB or TB of input data, the same script can be run across a multi node cluster without changes to the basic logic.  

I'm also going to try and capture what I've learned from being in the trenches with Pig over the last couple of months. We use it at work to process TBs of log data, and while it definitely has it's warty aspects, I find that it is well suited for basic summary, grouping, and filtering operations. 

Pig Setup
I downloaded and setup as described here. Pig assumes that you've got Hadoop installed, and specifically DFS started, as well.  I start DFS from the bin directory of my hadoop install: start-dfs.sh.
I'm running Pig in psuedo-local mode (with my local box set up as a single node hadoop cluster):

pig -fpigfile.pig

I symlinked the pig shell script to /usr/bin/pig for ease of use. I have HADOOP_HOME and JAVA_HOME defined, the pig shell script uses both, and assumes that it is located in a bin directory in a standard pig install. 

Pig Latin, UDFs, and Tuples
Pig scripts are written in Pig Latin. I'm going to cover a small subset of Pig Latin in this script, see full references part I and II for comprehensive overviews. Where Pig Latin falls short, Pig has User Defined Functions that allow loading, storing, and transforming data. UDFs are written in java and extend defined interfaces. 

Pig Latin and UDFs manipulate Tuples to generate sets of Tuples. Tuples are groups of 1..N values, where a value can be an int, a float, a string, another tuple, or a bag (set) of values. In Pig, you see a lot of 
   X = {do something in pig on} Y;

where Y is an original set of Tuples and X is the transformed result. Transformations of Data are done as Map-Reduce jobs. The complexity of what you are transforming determines how many map-reduce jobs are run.

The Goods

Load the data.


I'm loading the detail files generated from the XML->csv transform I did last time. I loaded all detail files into pig using the default PigStorage() UDF. In order to use this UDF, I had to specify the format of all fields in each row:
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

This gives me a tuple A that contains data as specified above. Note that this tuple represents all of the rows of loaded data. Also note that I used a wildcard to load all files in at the same time. PigStorage() also allows the user to separate locations by comma, which is really great when loading from multiple locations.

Note that the locations above are in HDFS.  I uploaded all of my data into HDFS using the hadoop fs -put command.

Extract the month from the timestamp using a custom UDF.


Right now I've got the timestamp formatted as a character array. I'm going to need to extract month data and add it to the list of tuple fields in order to get monthly averages. This is where the simple functionality of the pig script is supplanted by a UDF.

UDFs, as mentioned before, are used to load, store, and transform/extract data. I'm going to write one that extracts the month as an integer between 1-12  from the time string. Note in the listing below I broke out the actual formatting into a separate function that I could unit test w/o having to create a Tuple instance.


package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class GetMonth extends EvalFunc package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple; 
public class GetMonth extends EvalFunc (Integer) { 
// parens represent brackets, this is a templated class 
private static final int TIME_INDEX = 2;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);
      if(str != null) {
        return DateUtils.dateFromFormattedTime(str,Calendar.WEEK_OF_MONTH)
      } else { 
        return -1;
      }
      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

  private static final int TIME_INDEX = 0;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);

      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

I also created a UDF to generate the week of the month as well. I'm storing both the month and the week of the month as additional columns.

Load the UDFs.

In order for my script to see the UDFs I have just written, I need to load the jar that the UDFs reside in, and alias the methods so that they can be called in the script:

This is done by using the REGISTER keyword at the top of the script to load the jar, and the DEFINE keywords to alias UDFs. Note that when I define the UDF, I'm specifying it's constructor. In the cases above, GetMonth and GetWeekOfMonth both have default (no parameter) constructors.

REGISTER foo.jar;
    DEFINE extract_month com.infovoracious.udfs.GetMonth();
    DEFINE extract_week com.infovoracious.udfs.GetWeekOfMonth();

I invoke the methods like this:

-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;


Sort by month and week of month.
Now that we have the columns, we can sort by them. Sorting by month, week of month, groups values by unique month,week of month tuples:

C = GROUP B BY (month,week);
This creates a tuple that looks like this:
{month, week}, array length 1..N of {activityId,lapId,time,latitude,longitude,alt,dist,hr}
actual values would look like this:

{1,1},[{..},{..},..]
{1,2},[{..}...]
...

Summarize data.
With columns now grouped, I can summarize data from the grouped tuples. I want to summarize mileage per week. I can do that as follows (showing the whole script for continuity)


-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

C = GROUP B BY (month,week);

-- the tuple now looks like group(month,week) B(tuple contents), 
-- so I reference dist as a member of the B tuple.

D = FOREACH C GENERATE *, SUM(B.dist) as total_dist;


Flatten the grouping columns.
Using the FLATTEN keyword to remove tuple nesting has different effects depending on where you use it. If you use FLATTEN to flatten the grouped columns, it merely removes the bag from around the grouped columns:
{month, week}, [array of tuples]
becomes
month, week, [array of tuples].
Flattening out the array generates a new row for each array element:
1,2,[{a,b},{c,d}]
becomes
1,2 ,a,b
and
1,2,c,d
In this case I want to flatten out the grouping columns:

-- the load 
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

-- grouping
C = GROUP B BY (month,week);

-- summing (and discarding what I don't need)
D = FOREACH C GENERATE SUM(dist) as total_dist;

-- flattening:
-- note how I'm referring to the grouping columns by the 'group' special keyword.
-- note how total_dist is not scoped by an enclosing tuple b/c of the way it was generated above.
E = FOREACH D GENERATE FLATTEN(group),total_dist;

Store it.
Finally, I want to store the results of my work. This is done using the default PigStorage() store UDF:

STORE E into '$some_dir';

Note that I didn't need to specify 'using PigStorage()'. I also used a variable, which I pass into the pig script as a key-value pair, like this:

pig -p some_dir=foo...

And that's it! I've summed my mileage from each week per month over the last three years: in six lines of script!

Conclusion
Pig is a powerful tool to clean up and perform basic operations on data.  When you know what you need to do, and have to do it on a lot of data, it works remarkably well. That said, there are a couple of things that start to hurt over time:
  1. When you write a lot of Pig, you start wanting to re-use sections of script, i.e. make them functions. Except you cant. Yet. So you end up either cutting and pasting (bad), or saving variables off for later use (confusing when you try to use them later) instead of writing script based UDFs (preferred). 
  2. There is no conditional execution of an operation. I can't test a variable state and then execute one or another statements based on that variable. What this means in real world use is that I do all of my conditional testing in bash, prior to invoking the pig script, and build up variables based on the results of those conditional tests. Those variables then get accessed by operations.  
  3. The default pig serialization format is great for the data I was using above because it had no commas which are the default Pig serialization format delimiter. You can change the delimiter, but that begs the question of what happens when someone has injected your delimiter into the data you are trying to process. 
  4. Higher level workflow is not solved with Pig. We started out running pig via a set of cron jobs, which soon turned into an admin's worst nightmare. Besides just being hard to maintain, we were also not taking advantage of the actual sources and destinations of pig data to schedule the work. In other words, if Pig job A produced intermediate format B and then continued running, we had no way of starting Pig Job C that loaded format B until Pig job A completed.  We are currently evaluating oozie at work to see if it can improve on cron+bash (a low bar, I know).
Warts and all, Pig sure beats writing map reduces for simple grouping/aggregation functionality. We still write map reduces when we are doing more complex operations, and our ETL job flows end up looking like a combination of pig, raw map-reduce, and HDFS access. But Pig really allows us to get away from a lot of boilerplate work (and maintenance!) for processing the data. Which gives us more time to analyze it.

Next
Of course, just getting summary statistics on my data hasn't really scratched the itch that started me off in the first place. While I now have an idea of my general fitness month to month, I still don't know enough about the amount of hard versus easy running/cycling I did, or how fast I was going when I was going hard or easy. Answering those questions will help me determine my fitness, which I previously defined as heart rate vs pace, accounting for terrain.  In order to get a metric for that definition of fitness, I need to define of what 'hard' and 'easy' really are for me, and analyze how specific values of those workouts have changed for the better or worse over time. 

Tuesday, September 7, 2010

Data Mining My GPS/HRM Data: Step 1, Formatting the Data

I've been wanting to analyze the data from my Garmin 305 for a while now. I've been a casual runner/biker/hardcore data geek for a while now, and last year I started doing triathlons, which means even more data to analyze. While I've always been curious, I just haven't had a great 'need' to analyze it until now...

I'm switching from a primarily heart rate based training program to a 'pace based' training program. The former had me training within specific heart rate zones, the latter has me running at specific pace ranges. I'm a 'measurer', and I'm curious to see how effective (or not!) the pace based training program is.  Fortunately I have one device that tracks both heart rate and speed, and I can analyze the effect that the pace based training has relative to the effect that the heart rate based training has on my overall fitness.

In this case I'm measuring fitness as a combination of heart rate vs terrain covered, i.e. hilly vs flat, vs pace. In order to measure my fitness up to now and going forward, I need to answer the following questions:
  1. how much time did I spend in 'recovery' mode, where my heart rate was < 70% max
  2. how much time did I spend in 'pain cave' mode, where my heart rate was > 85-90% max?
  3. how much faster (or slower) did I get at the same heart rate over the last year? 
  4. for the new pace based program, how much time am  I spending at the different paces, i.e. recovery pace, base pace, marathon pace, 1/2 marathon pace, 10k pace, 5k pace, 1 mile pace? 
  5. what is my average heart rate for those paces? 
  6. how much faster (slower) am I getting? 
I'm hoping to answer these questions using several approaches and several technologies that I've been using at work, and others that I've been itching to try. 

The first thing I needed to do prior to doing any analysis was to format the data into a format that I could easily operate on. The data is exported from the device into a format called tcx, which is a schema-validated XML. 

I need the data in csv format, mainly because the tools I want to process the data with are all hadoop based, and while I've read that it is not only possible, but easy, to process XML with hadoop,  hadoop works best with csv formats. XML is a nice format for nested data, and this is nested data, with the following structure
  • activities
    • activity
      • laps
        • lap -- contains summary averages from trackpoint data (see below)
          • trackpoints
            • trackpoint -- contains snapshot heart rate, altitude, distance, etc.
XML is especially good when there are optional attributes. CSV tends to suck with optional attributes, because nothing is optional. In this case none of the attributes are optional, so ultimately XML is overkill for storing this data.

I collapsed this structure into two csv lists: summary data and detail data, because I planned to act on summary and detail data separately. 

The summary data contains lap summary data:
  • activity id, lap id, total time, total distance, max speed, max heart rate, average heart rate, calories, number of trackpoints.
The detail data contains trackpoint data:
  • lap id,time, latitude, longitude, altitude, distance, heart rate
To do the conversion, I used the ruby libxml Sax parser. In order to use the libxml sax parser, I needed to create a callback handler that implemented the methods I wanted to override.


class PostCallbacks
  include XML::SaxParser::Callbacks

  def on_start_element_ns(element, attributes, prefix, uri, namespaces)
  ...
  end


  def on_characters(chars)
  ...
  end


  def on_end_element_ns (name, prefix, uri)
  ...
  end
end

In the callback handler, I maintained state to track nested XML objects. Typically I would assign state in the on_start_element_ns() method, act on that state in the on_characters() method, and release state in the on_end_elemebt_ns() method. I would also flush my results to disk occasionally to avoid taking up an unreasonable amount of memory.

I had about 40 Meg of data from the previous 3 years, which was parsed into csv files in approximately 44 seconds. I'm more than happy with that performance right now, because this is essentially a one-off job to get the data.

Next Up: setting up a data processing pipeline using Pig.

Friday, August 27, 2010

Beware the Emerging God Object

Most God Objects are pretty obvious. Bloated, side effect filled, their existence guaranteed by collective fear about refactoring them to a state where their previous functionality is not reproducible.

The God Object that derailed me for the last couple of days was not bloated, it's functions (for the most part) did not have side effects. It did a lot, but did not seem to be overwrought.  In fact, I'm pretty sure it only had a minor God Complex until I started refactoring it!

As a software engineer, my goal is to produce the most effective, robust software with the least amount of effort. So, in an attempt to not repeat last week's thrash-fest, I'm going to catalog the warning signs, the resolution, and the corrected approach, because I'm pretty sure I'm going to be in this situation again.

The Task:
We need to make an existing protoype table driven. The prototype functionality configured and ran A/B tests. Its configuration data was currently hardcoded into an enum.

Warning Signs Before We Even Started:
  1. The core logic of the A/B test configuration  resided in a single object.
  2. Despite having several distinct (logical) sub components, this object was responsible for serializing itself and those sub objects.
  3. This object existed in both our configuration time and runtime environments, which have significant differences, the biggest one being that at configuration time, all objects are read/written to a relational database, and at runtime, they are read from a BDB store. Most conceptual objects in the system have config time and runtime implementations.
  4. We could not get rid of this object because it was still being used in production. We would have to implement a phased approach to upgrading it. 
  5. I did not know the codebase well. 
  6. Our sprints are a week, so we felt compelled to fit in an end to end task into a week. 
The first four warning signs are simply facts. The last two are things that I should pay more attention to. If I don't know the codebase well, and I'm trying to make significant improvements in a week, my chances of success go way down. I chose to ignore that reality.

Warning Signs Once We Started:

  1. The table driven logic did not mesh well with the existing logic. The developer who had written the code had serialized all sub objects into a JSON string. The object was then serialized into a BDB file as a set of attributes and the JSON string, not an object graph. Trying to work with the new table data (an object graph) mean that:
  2. Any change I made to the code had side effects. This is because I was trying to preserve the original signature of the object to maintain backward compatibility with the rest of the system. This became obvious when:
  3. The original code, fairly straightforward, became conditional-heavy. Instead of acknowledging the additional complexity, 
  4. I was choosing to 'hack' more because I knew that we would have time to rework the code. I knew I was hacking because
  5. The developer I was working with started to get more confused with every checkin I made. 

Do Over!
When his brow started cramping up from being so furrowed, I realized that things were wrong. We grabbed a whiteboard and went to work, depicting the original workflow, how adding table driven configurations changed that workflow, and how to change the existing workflow as little as possible to reduce the amount of rework that we would have to do in the runtime portion of the code.  Most of the good ideas came from our dev lead, who had been silently (and then not so silently) observing us trying to make sense of it all. I mention that because he was operating at a distance from a lot of the complexity, and his solution avoided most of it. I felt that he out of all of us was able to make the necessary leap out of the stew of constraints and details, and reframe the problem in a way that made it a much less complex problem. That's the genius of software. That's what I do, but only sometimes. That's why I'm writing this down.

The Problems and Their Solutions: 

  1. The old functionality contained both runtime behavior and configuration and behavior. We chose to separate the two, resulting in a POJO configuration data object used at configuration time, and the 'action' object that knew how to initialize given one of those data objects and contained runtime behavior.
  2. More separation of concerns: The old way of storing data in a JSON string  and the new table driven way of storing data did not belong together in the same object.  Even though we separated the old object into a data object and an 'action' object, we chose to keep the old data separate from the new data, and access both through getters/setters, which insulated us from the underlying implementations. 
  3. Composition: because we didn't have enough time to rewrite the infrastructure that wrote the configuration object into a BDB, we needed the new data to be persisted into  configuration object as part of the JSON string. Fortunately, once we separated out the data from the configuration object, we could compose the JSON string outside of the configuration object using the POJOs, then set it prior to serializing the configuration object to the BDB file. 

Conclusions:
The warning signs before start played out into more significant problems before they were fully resolved. And, when I think back on the process, I realize I was uncomfortable with the changes I was making, but I chose to push on. Because time was relatively compressed, I found myself making 'poor choices', which ultimately backfired. Next time (these are notes to myself, the 'you' I'm taking to is 'me' :)

  1. Acknowledge risks up front. Not a lot of time to get it done? That's a risk. Don't know the codebase? That's another risk (assuming the codebase is non trivial). 
  2. Examine the old code and determine if there are concerns that should be separated before anything else is done. 
  3. Determine the scope of those changes to see if they can be met in the desired time frame.
  4. If not, break the project into smaller chunks. 
  5. If it feels easy it's right. If it feels hard, it's less right. Be more right. 
  6. Do not hack. When you find yourself hacking, stop. Back up and revisit initial assumptions. Step away from the problem.
  7. Discuss ideas often, with others. If someone else can understand it easily, it can't be that bad. If, on the other hand, their brow starts to wrinkle, and your blood pressure starts to go up because there are not words in the English language to describe what you need to describe, it might just be worse than you think. 
More Conclusions: 

  1. God objects never started out as God Objects. They started out as high level concepts -- "oh, I need something that does X", and mutate because of bolted on additional functionality. 
  2. Most developers don't set out to create a God Object. In this specific case I was trying to save effort by reducing the amount of change to the original code. However, the additional effort to explain and understand my 'bolt ons' was costing real time and effort. 
  3. The earlier you de-factor complexity out of a God Object, the better. Decomposing layered functionality into discrete objects makes it really easy for other people to recognize what those objects are for, and not overlay them with functionality that addresses other concerns. 
  4. Conversely, the longer you wait, the harder it is. God Objects tend to have code with lots of side-effects and implicit assumptions. Refactoring those successfully is hard, even with comprehensive unit tests. 



Wednesday, April 21, 2010

Synchronization Redux

In my last post, I discussed how I had refactored the synchronization around some data structures to reduce the need to lock those data structures except when necessary. My solution was cleaner in that the synchronization was localized, but the actual synchronization was still a little bit hairy in that it required a recheck inside the lock:

int newVersion = specialFooCache.getCacheVersion();

 // we only want one thread to refresh the cache, and we
 // want the other ones to keep using the old data structures.

 if(newVersion != cacheVersion && inProgress == false) {
  synchronized(lock) {
   // first thread is in b/c cacheVersion has not been updated yet.    

   // All other threads    evaluate to false.
   int checkVersion = flexibleStrategyCache.getCacheVersion();
   if(checkVersion != cacheVersion) {

   }
  }
 }

But I was (a) stuck and (b) had other code to refactor under the gun. So I didn't think about it much until today in the code review. I'm really glad I code reviewed with John, because he is a relentless simplifier whose lives by the 'less code' motto.

Right away I could tell that the whole recheck thing wasn't sitting well with him. The more I explained, the more concerned he looked, until he spotted a potential race condition between threads that might have piled up outside the synchronize when the someCache had incremented while it was being reloaded. These threads would immediately reload even when they didn't have to, which didn't affect the integrity of the in memory data structures, but was definitely a bad use of cpu. He also was insistent that there had to be an easier way, and under his direction this is that easier way:

 int newVersion = 0;
 // lock is shorter
 synchronized(lock) {
  newVersion = specialFooCache.getCacheVersion();
 
  // leave right away if in progress
  if(cacheVersion == newVersion || inProgress == true) {
   return;
 }
 
  inProgress = true;
 }
 
 
 try {
  // load from cache
 
 } catch (Exception e) {
 ...
 } finally {
  synchronized(lock) {
   // set the version.
   cacheVersion = newVersion;
   // we are done here.
   inProgress = false;
   
  }
 }

A couple of things to note:
(1) This solution is definitely easier and more performant in addition to being correct. The lock is not held during the reload, and when it is released other threads will exit the function immediately if a cache rewrite is in progress.
(2) The key thing that I learned from watching John solve the problem is that he wasn't happy until it was dead simple. When I don't take this approach, or abandon it under pressure, there are some potentially costly ramifications. The original solution had a complexity smell around it that I chose to ignore. The problem with ignoring that smell is that it comes back to haunt after you've forgotten why you wrote it in the first place. The solution is, of course, to write code that is simple enough to re-understand months from now. Or simple enough for anyone not familiar with your code to understand.
(3) the lock at the bottom is not actually necessary since (a) the variables being set are volatile and (b) only one thread will get this far. However, since only one thread is getting this far, it is not a performance issue, and actually lends some clarity to the resetting of test conditions.
(4) I still feel like this is somewhat a work in progress...I'm not convinced that (3) is really required (even though we both signed off on it). But I do feel a lot better about the current algorithm. That said,
(5) I love code reviews. The knowledge transfer is huge. I'm always looking for ways to strip my code down, and the insight I get from a good reviewer (like John) raises my game tremendously.

Thursday, April 15, 2010

Synchronization Requirements

Recently I was lucky enough to pull the short straw and refactor the dreaded FooManager (name changed to protect the innocent). FooManager was a horrendously complex, overwrought class that suffered from at least two God Complexes, and my team had been playing hot potato with it for the last couple of months, since we learned we had inherited it.

My mission was to make FooManager understandable/maintainable without requiring heavy ingestion of psychotropic substances or becoming suicidal. After doing the usual de-Godification and Cleverectomy procedures, i.e. choosing decent abstractions, removing static methods, etc, I was left with a fundamental synchronization dilemma.  It was far from the most annoying part of FooManager, but it was definitely the most interesting to refactor.

FooManager received updates for each specific Foo subclass from different caches. It registered for those updates using a callback interface. The cache notified FooManager whenever it changed. This seemed like an innocent enough pub/sub pattern. But it made managing the datastructures in FooManager -- the ones that held pre-computed indexes of Foos by name, Foos by id, Foos by other id, etc -- tricky.

All access to these data structures were done within read/write locks. I think the motivation behind using a read/write lock instead of a more generic synchronization mechanism was to optimize reads at the expense of writes. This made sense, given that writes were infrequent and driven by periodic cache changes, and reads did not need immediate access to new data. But it meant that any time you wanted to access the structures, you did so within the confines of a read/write lock. Even when all you wanted to do was read the data, you needed to acquire the read lock on it.

The event-callback and the read/write lock had a Design-Pattern-as-a-Hammer smell to it, and as I thought about it some more, I realized why.

(1) The data structures under read/write locks did not have to be immediately consistent with the cache, they had to be eventually consistent with the cache. That meant that a cache change did not require immediate and total synchronization.
(2) The read/write lock was overkill, but necessary because of the asynchronous nature of the cache refill.
(3) The loading performance of the cache was really minimal, since the cache data structures were in memory by the time the notification occurred.
(4) What we really wanted to do was only block when a thread was updating the data structures. We didn't want other threads re-updating the data structures. But they shouldn't be blocked from accessing the current data structures while the update was going on.

The eventual consistency and low reload overhead of the system made it possible to do away with the event callback interface and poll the cache for an update by checking a cache version number via a synchronized method. Heres how it worked:
  1. Every request would check the cache version. 
  2. The first thread that got an updated version number would enter a lock and update a set of temporary data structures. 
  3. All other threads would block on the lock, enter once the original writer method had exited, make the version check, get the same version that they had entered with, and not attempt a reload.

synchronized protected int checkExpired(int currentVersion) throws Exception{

  int newVersion = specialFooCache.getCacheVersion();
  if(newVersion != currentVersion) {
   // update temp data structures.
   ...
   // assign temps to member data structures
   ...
  }
  return newVersion;

}

Prior to every request, I checked the cache version and reloaded if necessary:

 // cacheVersion is a member variable of FooManager
 cacheVersion = checkExpired(cacheVersion);
 // now access structure w/o locks.


This was, well, OK. It was definitely more simple, but now all reader threads were blocked while waiting for the first one to update the data structures.

Thinking back to the original conditions, I remembered that as long as the data structures were eventually updated, all non writing threads would be just as happy using the original structures in an unblocked manner. In other words, we only needed to make sure that one thread reloaded the original structures.

This required synchronization at a more granular level: I removed synchronization on the checkExpired() method, and inside of it, blocked the rewrite code using a conditional statement followed by a lock, and allowed all threads that had gotten past the conditional and stuck on the lock to exit once (a) the lock was released and (b) they realized that they didn't need to do the cache reload.

protected void checkExpired() {

 int newVersion = specialFooCache.getCacheVersion();

 // we only want one thread to refresh the cache, and we
 // want the other ones to keep using the old data structures.

 if(newVersion != cacheVersion && inProgress == false) {
  synchronized(lock) {
   // first thread is in b/c cacheVersion has not been updated yet.    

   // All other threads    evaluate to false.
   int checkVersion = specialFooCache.getCacheVersion();
   if(checkVersion != cacheVersion) {

    inProgress = true;
    // reload from cache
    .....
    // set data structures so that subsequent threads get
    // the new data
    ....
    // set the version.
    cacheVersion = newVersion;
    // we are done here.
    inProgress = false;
   } // end version recheck
  } // end lock.
 } // end version and inProgress check


Note that in order to prevent thread local caching and allow all threads to get immediate access to the data structures after they were reassigned, I declared the data structures that I was updating as volatile.

The simplification of the cache update process made it a whole lot easier to understand,  I think that the conditions -- specifically the eventual consistency -- allowed us some latitude in when reader threads actually got the data. There isn't a specific by the numbers approach to solving synchronization issues, but it always helps to understand what you are synchronizing, and more importantly,  what you don't have to synchronize.

Thursday, April 1, 2010

Using Google Maps API v3 + JQuery.

My side project to display and do some detailed data mining of my GPS exercise data, has been languishing for the better part of a year while my day job(s) have been taking most of my day and night time. Garmin has a pretty decent Mac desktop program that provides graphing, graph zooming, and stats by mile, but I'd rather have all of that functionality (and more!) via a web UI. I'd also like to integrate the results of the data mining into that UI in a useful way, i.e. mining relative effort over similar terrain to track actual fitness over time.

I decided that this project is going to be about having fun and nothing more, and as such decided to write the UI first, because all of my work these days is server side, Java, so 'fun' for me involves more dynamic languages, i.e. JavaScript and Ruby.

I wanted to display my gps data as a route on a map, which meant getting up to speed on the Google Maps API, and writing a quick dummy server that could dump out some route data for me.

I decided to use the latest Google Maps API v3 , and of course JQuery for the front end work, and mocked up a quick backend server using Sinatra. I can always redo that backend in something more robust once I want to actually deploy, but for now getting the data to the page is more important than how fast that data is retrieved, or machine resources consumed by serving that data.

Part 1: Displaying The Map


I needed to include the google maps api v3 js:

http://maps.google.com/maps/api/js?sensor=false
and the latest JQuery:
http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js

Then in the ready function, I created a map by pointing it to a div and passing in my options.

$(document).ready(function() {
    var myLatlng = new google.maps.LatLng(47.5615, -122.2168);
    var myOptions = {
          zoom: 12,
          center: myLatlng,
          mapTypeId: google.maps.MapTypeId.TERRAIN
    };
    var map = new google.maps.Map($("#map_canvas")[0], myOptions);
   
Note above that I'm assigning the map to a div with id = "map_canvas".

Part 2: Parsing the GPS data

Eventually I'd like to upload data directly from my device, but for now I'm going to skip that part and 'pretend' I've already done it. GPS data is exported in the TCX format, which is Garmin-proprietary, but is the easiest to use right now. My current desktop program has the ability to export 1 to N days worth of data into tcx.

At some point parsing tcx using a DOM based parser was going to start hurting, so I decided to use a SAX based parser from the start. My usual choice for quick n dirty XML/HTML parsing, hpricot, option was therefore not an option. I investigated nokogiri, but eventually settled on libxml, mostly because the rdoc on sax parsing was very clear, and it was much faster for sax parsing.


I mainly wanted to parse lat-long data out of the tcx file and dump the coordinates into another file in JSON format. Here is my 5 minute hacked together code:


class PostCallbacks
include XML::SaxParser::Callbacks

def initialize(write_file)
  @state="unset"
  @write_file = File.open(write_file,"w")
  @buffer = "{\"data\" : ["

end

def on_start_element_ns(element, attributes, prefix, uri, namespaces)

  if element == 'LatitudeDegrees'
   @state = "in_lat"
  elsif element == 'LongitudeDegrees'
   @state = "in_long"
  end
end

def on_characters(chars)
  if(@state=="in_lat")
   @buffer += "{\"lat\": #{chars}"
  elsif(@state == "in_long")
   @buffer += ", \"long\": #{chars}},"
  end
end

def on_end_element_ns(element,prefix,uri)

  @state="unset"
end

def on_end_document()

  @buffer = @buffer.slice(0,@buffer.length-1)
  @buffer += ("]}")
  @write_file.puts(@buffer)
  @write_file.close()
end
 

end

parser = XML::SaxParser.file(ARGV[0])
parser.callbacks = PostCallbacks.new(ARGV[1])
parser.parse


Part 3: Serving The GPS Data Up

My goal here was to basically dump that generated file as a response to an AJAX request. Sinatra is perfect for delivering quick services like this. I use Sinatra's DSL to handle a GET request as follows:

require 'rubygems'
require 'sinatra'
require 'open-uri'
require 'json'

  get '/sample_path.json' do
   content_type :json
   File.open("../output/out.json") do | file |
   file.gets
   end

  end


The ../output/out.json is where I parsed the lat-long data from the tcx file into.
I ended up spending a lot of time debugging my get method (http://localhost:7000/sample_path.json). In FF I was unable to get a response back from my GET request. I googled around and apparently there are some compatibility issues with firefox 3.6.2 and JQuery. I was however able get the code to work in Safari, and I'm considering downgrading to FF 3.5 because I haven't seen those kind of problems with that browser, and Firebug is an essential part of my debugging library.

Part 4: Drawing The Data On the Map
The JS code that made the request loads the results into google.maps.MVCArray, which it then uses to create a polyline superimposed on the map:


var url = "http://localhost:4567/sample_path.json";
$.ajax({
  type: "GET",
  url: url,
  beforeSend: function(x) {
   if(x && x.overrideMimeType) {
     x.overrideMimeType("application/json;charset=UTF-8");
  }
  },
  dataType: "json",
  success: function(data,success){

   var latLongArr = data['data'];
   var pathCoordinates = new google.maps.MVCArray();
   for(i = 0; i < latLongArr.length; i++) { 

     // each coordinate is put into a LatLng. 
     var latlng = new  google.maps.LatLng(latLongArr[i]['lat'],
        latLongArr[i]['long']); 
     pathCoordinates.insertAt(i,latlng); 
   } 
   // and this is where we actually draw it. 
   var polyOptions = { 
     path: pathCoordinates, 
     strokeColor: '#ff0000', 
     strokeOpacity: 1.0, 
     strokeWeight: 1 
   };
   
   poly = new google.maps.Polyline(polyOptions); poly.setMap(map); 
   } 
 }); 

The Result

Not super impressive, but a good start!