Friday, September 24, 2010

Datamining my GPS/HRM Data, Step 2: Pig for ETL

Overview
Now that I've extracted 3 years worth of GPS/HRM data into CSV format, I want to get some basic summary information. Specifically, I want
  • the total distance covered for running, biking and 'other' (usually skate skiing) per month. 
  • average run and ride mileage per month
  • average running and cycling pace per month
Pretty no brainer stuff that I could probably write a quick 'n dirty ruby script to do for my paltry 40MB of data. 

But...I want to use Pig. While this may seem like a candidate for the "Cutting Butter With A Chainsaw" award, I'm actually trying to show that Pig Scripting is pretty damn useful for ETL. The nice thing about Pig is that when I'm dealing with 40GB or TB of input data, the same script can be run across a multi node cluster without changes to the basic logic.  

I'm also going to try and capture what I've learned from being in the trenches with Pig over the last couple of months. We use it at work to process TBs of log data, and while it definitely has it's warty aspects, I find that it is well suited for basic summary, grouping, and filtering operations. 

Pig Setup
I downloaded and setup as described here. Pig assumes that you've got Hadoop installed, and specifically DFS started, as well.  I start DFS from the bin directory of my hadoop install: start-dfs.sh.
I'm running Pig in psuedo-local mode (with my local box set up as a single node hadoop cluster):

pig -fpigfile.pig

I symlinked the pig shell script to /usr/bin/pig for ease of use. I have HADOOP_HOME and JAVA_HOME defined, the pig shell script uses both, and assumes that it is located in a bin directory in a standard pig install. 

Pig Latin, UDFs, and Tuples
Pig scripts are written in Pig Latin. I'm going to cover a small subset of Pig Latin in this script, see full references part I and II for comprehensive overviews. Where Pig Latin falls short, Pig has User Defined Functions that allow loading, storing, and transforming data. UDFs are written in java and extend defined interfaces. 

Pig Latin and UDFs manipulate Tuples to generate sets of Tuples. Tuples are groups of 1..N values, where a value can be an int, a float, a string, another tuple, or a bag (set) of values. In Pig, you see a lot of 
   X = {do something in pig on} Y;

where Y is an original set of Tuples and X is the transformed result. Transformations of Data are done as Map-Reduce jobs. The complexity of what you are transforming determines how many map-reduce jobs are run.

The Goods

Load the data.


I'm loading the detail files generated from the XML->csv transform I did last time. I loaded all detail files into pig using the default PigStorage() UDF. In order to use this UDF, I had to specify the format of all fields in each row:
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

This gives me a tuple A that contains data as specified above. Note that this tuple represents all of the rows of loaded data. Also note that I used a wildcard to load all files in at the same time. PigStorage() also allows the user to separate locations by comma, which is really great when loading from multiple locations.

Note that the locations above are in HDFS.  I uploaded all of my data into HDFS using the hadoop fs -put command.

Extract the month from the timestamp using a custom UDF.


Right now I've got the timestamp formatted as a character array. I'm going to need to extract month data and add it to the list of tuple fields in order to get monthly averages. This is where the simple functionality of the pig script is supplanted by a UDF.

UDFs, as mentioned before, are used to load, store, and transform/extract data. I'm going to write one that extracts the month as an integer between 1-12  from the time string. Note in the listing below I broke out the actual formatting into a separate function that I could unit test w/o having to create a Tuple instance.


package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class GetMonth extends EvalFunc package com.infovoracious.udfs;

import java.io.IOException;
import java.util.Calendar;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple; 
public class GetMonth extends EvalFunc (Integer) { 
// parens represent brackets, this is a templated class 
private static final int TIME_INDEX = 2;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);
      if(str != null) {
        return DateUtils.dateFromFormattedTime(str,Calendar.WEEK_OF_MONTH)
      } else { 
        return -1;
      }
      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

  private static final int TIME_INDEX = 0;

  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0)
      return null;
    try {
      // looks like this: 2010-02-24T14:22:29Z
      String str = (String) input.get(TIME_INDEX);

      return DateUtils.dateFromFormattedTime(str, Calendar.MONTH) + 1;
    } catch (Exception e) {
      throw new IOException("Caught exception processing input row ", e);
    }
  }

}

I also created a UDF to generate the week of the month as well. I'm storing both the month and the week of the month as additional columns.

Load the UDFs.

In order for my script to see the UDFs I have just written, I need to load the jar that the UDFs reside in, and alias the methods so that they can be called in the script:

This is done by using the REGISTER keyword at the top of the script to load the jar, and the DEFINE keywords to alias UDFs. Note that when I define the UDF, I'm specifying it's constructor. In the cases above, GetMonth and GetWeekOfMonth both have default (no parameter) constructors.

REGISTER foo.jar;
    DEFINE extract_month com.infovoracious.udfs.GetMonth();
    DEFINE extract_week com.infovoracious.udfs.GetWeekOfMonth();

I invoke the methods like this:

-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;


Sort by month and week of month.
Now that we have the columns, we can sort by them. Sorting by month, week of month, groups values by unique month,week of month tuples:

C = GROUP B BY (month,week);
This creates a tuple that looks like this:
{month, week}, array length 1..N of {activityId,lapId,time,latitude,longitude,alt,dist,hr}
actual values would look like this:

{1,1},[{..},{..},..]
{1,2},[{..}...]
...

Summarize data.
With columns now grouped, I can summarize data from the grouped tuples. I want to summarize mileage per week. I can do that as follows (showing the whole script for continuity)


-- the load from last time
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

C = GROUP B BY (month,week);

-- the tuple now looks like group(month,week) B(tuple contents), 
-- so I reference dist as a member of the B tuple.

D = FOREACH C GENERATE *, SUM(B.dist) as total_dist;


Flatten the grouping columns.
Using the FLATTEN keyword to remove tuple nesting has different effects depending on where you use it. If you use FLATTEN to flatten the grouped columns, it merely removes the bag from around the grouped columns:
{month, week}, [array of tuples]
becomes
month, week, [array of tuples].
Flattening out the array generates a new row for each array element:
1,2,[{a,b},{c,d}]
becomes
1,2 ,a,b
and
1,2,c,d
In this case I want to flatten out the grouping columns:

-- the load 
A = LOAD '/csv/input/details*' USING PigStorage() AS (activityId:chararray, lapId:chararray, time:chararray, latitude:float, longitude:float, alt:float,dist:float,hr:int);

-- adding columns to a tuple.
B = FOREACH A GENERATE *, extract_month(time) as month , extract_week(time) as week;

-- grouping
C = GROUP B BY (month,week);

-- summing (and discarding what I don't need)
D = FOREACH C GENERATE SUM(dist) as total_dist;

-- flattening:
-- note how I'm referring to the grouping columns by the 'group' special keyword.
-- note how total_dist is not scoped by an enclosing tuple b/c of the way it was generated above.
E = FOREACH D GENERATE FLATTEN(group),total_dist;

Store it.
Finally, I want to store the results of my work. This is done using the default PigStorage() store UDF:

STORE E into '$some_dir';

Note that I didn't need to specify 'using PigStorage()'. I also used a variable, which I pass into the pig script as a key-value pair, like this:

pig -p some_dir=foo...

And that's it! I've summed my mileage from each week per month over the last three years: in six lines of script!

Conclusion
Pig is a powerful tool to clean up and perform basic operations on data.  When you know what you need to do, and have to do it on a lot of data, it works remarkably well. That said, there are a couple of things that start to hurt over time:
  1. When you write a lot of Pig, you start wanting to re-use sections of script, i.e. make them functions. Except you cant. Yet. So you end up either cutting and pasting (bad), or saving variables off for later use (confusing when you try to use them later) instead of writing script based UDFs (preferred). 
  2. There is no conditional execution of an operation. I can't test a variable state and then execute one or another statements based on that variable. What this means in real world use is that I do all of my conditional testing in bash, prior to invoking the pig script, and build up variables based on the results of those conditional tests. Those variables then get accessed by operations.  
  3. The default pig serialization format is great for the data I was using above because it had no commas which are the default Pig serialization format delimiter. You can change the delimiter, but that begs the question of what happens when someone has injected your delimiter into the data you are trying to process. 
  4. Higher level workflow is not solved with Pig. We started out running pig via a set of cron jobs, which soon turned into an admin's worst nightmare. Besides just being hard to maintain, we were also not taking advantage of the actual sources and destinations of pig data to schedule the work. In other words, if Pig job A produced intermediate format B and then continued running, we had no way of starting Pig Job C that loaded format B until Pig job A completed.  We are currently evaluating oozie at work to see if it can improve on cron+bash (a low bar, I know).
Warts and all, Pig sure beats writing map reduces for simple grouping/aggregation functionality. We still write map reduces when we are doing more complex operations, and our ETL job flows end up looking like a combination of pig, raw map-reduce, and HDFS access. But Pig really allows us to get away from a lot of boilerplate work (and maintenance!) for processing the data. Which gives us more time to analyze it.

Next
Of course, just getting summary statistics on my data hasn't really scratched the itch that started me off in the first place. While I now have an idea of my general fitness month to month, I still don't know enough about the amount of hard versus easy running/cycling I did, or how fast I was going when I was going hard or easy. Answering those questions will help me determine my fitness, which I previously defined as heart rate vs pace, accounting for terrain.  In order to get a metric for that definition of fitness, I need to define of what 'hard' and 'easy' really are for me, and analyze how specific values of those workouts have changed for the better or worse over time. 

Tuesday, September 7, 2010

Data Mining My GPS/HRM Data: Step 1, Formatting the Data

I've been wanting to analyze the data from my Garmin 305 for a while now. I've been a casual runner/biker/hardcore data geek for a while now, and last year I started doing triathlons, which means even more data to analyze. While I've always been curious, I just haven't had a great 'need' to analyze it until now...

I'm switching from a primarily heart rate based training program to a 'pace based' training program. The former had me training within specific heart rate zones, the latter has me running at specific pace ranges. I'm a 'measurer', and I'm curious to see how effective (or not!) the pace based training program is.  Fortunately I have one device that tracks both heart rate and speed, and I can analyze the effect that the pace based training has relative to the effect that the heart rate based training has on my overall fitness.

In this case I'm measuring fitness as a combination of heart rate vs terrain covered, i.e. hilly vs flat, vs pace. In order to measure my fitness up to now and going forward, I need to answer the following questions:
  1. how much time did I spend in 'recovery' mode, where my heart rate was < 70% max
  2. how much time did I spend in 'pain cave' mode, where my heart rate was > 85-90% max?
  3. how much faster (or slower) did I get at the same heart rate over the last year? 
  4. for the new pace based program, how much time am  I spending at the different paces, i.e. recovery pace, base pace, marathon pace, 1/2 marathon pace, 10k pace, 5k pace, 1 mile pace? 
  5. what is my average heart rate for those paces? 
  6. how much faster (slower) am I getting? 
I'm hoping to answer these questions using several approaches and several technologies that I've been using at work, and others that I've been itching to try. 

The first thing I needed to do prior to doing any analysis was to format the data into a format that I could easily operate on. The data is exported from the device into a format called tcx, which is a schema-validated XML. 

I need the data in csv format, mainly because the tools I want to process the data with are all hadoop based, and while I've read that it is not only possible, but easy, to process XML with hadoop,  hadoop works best with csv formats. XML is a nice format for nested data, and this is nested data, with the following structure
  • activities
    • activity
      • laps
        • lap -- contains summary averages from trackpoint data (see below)
          • trackpoints
            • trackpoint -- contains snapshot heart rate, altitude, distance, etc.
XML is especially good when there are optional attributes. CSV tends to suck with optional attributes, because nothing is optional. In this case none of the attributes are optional, so ultimately XML is overkill for storing this data.

I collapsed this structure into two csv lists: summary data and detail data, because I planned to act on summary and detail data separately. 

The summary data contains lap summary data:
  • activity id, lap id, total time, total distance, max speed, max heart rate, average heart rate, calories, number of trackpoints.
The detail data contains trackpoint data:
  • lap id,time, latitude, longitude, altitude, distance, heart rate
To do the conversion, I used the ruby libxml Sax parser. In order to use the libxml sax parser, I needed to create a callback handler that implemented the methods I wanted to override.


class PostCallbacks
  include XML::SaxParser::Callbacks

  def on_start_element_ns(element, attributes, prefix, uri, namespaces)
  ...
  end


  def on_characters(chars)
  ...
  end


  def on_end_element_ns (name, prefix, uri)
  ...
  end
end

In the callback handler, I maintained state to track nested XML objects. Typically I would assign state in the on_start_element_ns() method, act on that state in the on_characters() method, and release state in the on_end_elemebt_ns() method. I would also flush my results to disk occasionally to avoid taking up an unreasonable amount of memory.

I had about 40 Meg of data from the previous 3 years, which was parsed into csv files in approximately 44 seconds. I'm more than happy with that performance right now, because this is essentially a one-off job to get the data.

Next Up: setting up a data processing pipeline using Pig.