Sunday, November 17, 2013

Hadoop Streaming with MRJob

Motivation to use Streaming:

Writing java map-reduces for simple jobs feels like 95% boilerplate, 5% custom code. Streaming is a much simpler interface into Mapreduce, and it gives me the ability to tap into of the rich data processing, statistical analysis and nlp modules of Python.

Motivation to use mrjob:

While the interface to Hadoop Streaming couldn't be simpler, not all of my jobs are simple 'one and done' map-reduces, and most of them require custom options MRJob allows you to configure and run a single map and multiple reduces.  It also does some blocking and tackling, allowing me to customize arguments and passing them into specified jobs. Finally, mrjob can be applied to an on prem cluster or an amazon cluster - and we are looking at running amazon clusters for specific prototype use cases.

mrjob and streaming hurdles

The mrjob documentation is excellent for getting up and running with a simple job. I'm going to assume that you have read enough to know how to subclass MRJob, set up a map and a reduce function, and run it.

I'm going to discuss some of the things that weren't completely obvious to me after I had written my first job, or even my second job. Some of these things definitely made sense after I had read through the documentation, but it took multiple reads, some debug attempts on a live cluster, and some source code inspection.

Hurdle #1: passing arguments

My first job was basically a multi dimensional grep: I wanted to walk input data that had timestamp information  a tab delimited field and only process those lines that were in my specified date range.  In order to do this  I needed two range arguments that took date strings to do the range check in the mapper.  I also wanted to be able to apply specified regex patterns to those lines at map time.  Because there were several regex patterns,  I decided to put them in a file and parse them. So I needed to pass three arguments into my job, and those arguments were required for every mapper that got run in the cluster.

In order to pass arguments into my job, I had to override the configure_options() method of MRJob and use add_passthrough_option() for the range values, and add_file_option() for the file that held the regexes:

def configure_options(self):
        super(HDFSUsageByPathMatch,self).configure_options()
        self.add_passthrough_option("--startDateRange",type='string',help='...')
        self.add_passthrough_option("--endDateRange",type='string',help='')
        self.add_file_option("--filters")

All options were passed straight through to my job from the command-line:

python job.py --startDateRange 01/01/13 --endDateRange 12/01/13 --filters filters.json

I referenced them in an init function of my job class, which subclassed the MRJob class:

class MyJob:
     ...
    def task_init(self):
        self.startDateRange = dateutil.parser.parse(self.options.startDateRange)
        self.endDateRange = dateutil.parser.parse(self.options.endDateRange)
        self.filters = parseJsonOptions(self.options.filters)

This init method was specified in the MyJob.steps() override of the default MRJob method:

def steps(self):
        return [


            self.mr(mapper_init = self.task_init,
            .....
        ]

Something to note here: In the code I had written during development,  I had neglected to really read the documentation and as a result I had previously done all validation of my custom args using a standard OptParse class in my main handler. This worked for me in inline mode, which is what I was developing in. It does not work at all when running the job on a cluster, and it took some source code digging to figure out. Do as I say, not as I do :) In hadoop mode, the main MRJob script file is passed to mapper and reducer nodes with the step parameter set to the appropriate element in the steps array. The entry point into the script is the default main, and MRJob has a set of default parameters it needs to pass through to the MRJob subclassed job class. Overriding parameter handling in main effectively breaks MRJob when it tries to spawn mappers and reducers on worker nodes. MRJob handles the args for you, and you need to let it handle all arg parsing, and pass custom arguments as passthrough or file options. 

Hurdle #2: passing python modules

This nuance has more to do with streaming than it does with mrjob. But it's worth understanding if you're going to leverage non-standard Python modules in your mapper or reducer code, and those modules have not been installed on all of your datanodes.

I was using the dateutil class because it makes parsing dates from strings super easy. On a single node, getting dateutil up and running is this hard:

easy_install python-dateutil

But when you're running a streaming job on a cluster, that isn't an option. Or, it wasn't an option for me because the ops team didn't give me sudoers permissions on the cluster nodes, and even if they did, I would have had to write the install script to ssh in, do the install, and roll back on error. Arrgh, too hard.

What worked for me was to
  1. Download  the source code
  2. Zip it up (it arrived in tar.gz)
  3. Change the extension of the zip file because files that end in .zip are automatically moved to the lib folder of the task's working directory
  4. Access  it from within my script by putting it into the load path: 
sys.path.insert(0,'dateutil.mod/dateutil')
import dateutil
...

I'm passing dateutil.mod as a file passed in via add_file_option() in  myjob.configure_options(). Leveraging the add_file_option() method puts dateutil.mod in the local hadoop job's working directory:

def configure_options(self):
        super(HDFSUsageByPathMatch,self).configure_options()
        ....
        self.add_file_option("--dateutil")

Three things to note from the above code: (1) dateutil.mod is the zip file, (2) I'm referencing a module within the zip file by it's path location in that zipfile, and (3) because I've renamed the file, it gets placed in the job working directory, which means it is on my path by default. 

This is how I pass dateutil.mod into the job:

python job.py ... --dateutil dateutil.mod

Hurdle #3 (not quite cleared): chaining reduces vs map-reduces

As mentioned in the doc, it's super easy to chain reduces to do successive filtering and processing. Simply specify your multiple reduces in the steps() override:

def steps(self):
        return [
            self.mr(mapper_init = self.task_init,
                mapper=self.mapper_filter_matches,
                combiner=self.combiner_sum_usage,
                reducer=self.reducer_sum_usage),
            self.mr(reducer_init = self.task_init,
                    reducer=self.reducer_filter_keys)

        ]    

I haven't found it necessary to run successive mapreduces -- successive reduces work just as well in the use cases I've tried. When chaining reduces to the end of your first mapreduce, you can specify the key value from the first mapreduce as the key value in the next reduce.

What is not easy at this time is the ability to save intermediate output to a non intermediate location. While doing that is relatively straightforward in 'inline' mode, the approach suggested in the link won't work in hadoop mode because MRJob is invoking the python script with the right --step-num argument based on what it sees in the steps() method.

I did read about the --cleanup option, but from what I understand the intermediate output dir of a complex job is based on a naming convention, not on something I can set. As this is somewhat of an edge case, I can work around it by chaining MRJob runs with Oozie.

Summary

What I've learned about MRJob is that while it does a great job of allowing you to set and pass options, and allows you to construct good workflows (assuming you don't care about intermediate output), it is so easy to use that I fell into the trap of believing that running local on my machine was equivalent to running on a hadoop cluster.

As I've found out several times above, that is not the case. For me the keys here are (1) let MRJob handle your job specific variables, (2) leverage the steps() method for your more complex flows, and (3) if you need to save intermediate output, chain your jobs using an external scheduler.

Friday, November 8, 2013

Innovation -- trying to break out beyond the buzzword

Innovation is the poster child of buzzword bingo.


It's hard not to have an allergic reaction to people that talk about it, because you can't talk about innovation and do it at the same time.

So why am I talking about Innovation instead of doing it ? :)

A couple of months back, when we were revamping our development process, basically going from 'Scrum-in-name-only' to something much more genuine (and I've got to do a post on that), we wanted to give people a block of time to do something completely different from their day jobs. We wanted them to work with different people, outside of their usual teams, on ideas that they (not we) thought of. We wanted to break down some of the walls that naturally occur when you section large teams into smaller units to get work done efficiently.

We're sitting on some amazing data and have built some great infrastructure to manage it. These people are on the teams that are work with that data and use that infrastructure day in and day out. They're smart. I know they have ideas on new data products, or tools to make getting insights easier, but no time to actually work on them. Most importantly, I know their ideas are good ones, because I've seen multiple people make those ideas happen in spite of having no time to work on them. We have products that we've built because people have championed their ideas into the delivery stream. I wanted to make that easier. You shouldn't have to be Rocky Balboa to get a good idea off the ground.

In other words, that kind of effort shouldn't happen on nights and weekends, against all odds -- we need to reward that kind of creativity during business hours -- while balancing the delivery needs of the business.

'Innovation Week' is our collective attempt to do just that. One week a quarter is enough time to stop business as usual and try something completely different. Innovation Week is very much an experiment, one that could go well....or not.

The overall plan:

  1. Before:
    1. Announce the week. 
    2. Send out a 'request for ideas' email
    3. Review ideas in as many sessions as we needed:
      1. the idea 'author' presents their idea canvas.
      2. We go over the canvas, ask questions, offer suggestions.
  2. During: 
    1. Everyone sells their idea.
      1. key in the selling: they need to ask for help where they need it.
    2. People provide their first, second, third choices.
    3. We assign people to ideas -- the reason we arent going to just let people choose is that we don't want imbalanced teams, and we want to make sure groups were diverse. 
    4. The teams work on the ideas -- we are available to unblock any issues and provide guidance if asked. 
  3. After:
    1. Every team presents their work.
    2. The group stack rates all ideas. 
    3. The top 3 get prizes. 
    4. The management team gives separate awards for 
      1. Business Value
      2. Completeness of Effort
      3. Disruptiveness (of the idea, of the technology being used, etc)


As the saying goes: 'no battle plan survives first contact with the enemy'. I was fairly nervous. What if no one had ideas? What if the team could care less? What if they were as allergic to the I-word as I was?

Our first idea review meeting was last night. Instead of the 1 or 2 ideas we had predicted, we have (at last count ) six. Instead of the vague, tech-aspirational ideas we thought we were going to see -- things along the vein of 'I want to play with technology X, here is a contrived attempt to justify that', we saw carefully thought out resolutions to problems our team was either working around or about to go through.

The discussion around the ideas was very positive and constructive -- the ideas that were presented got a lot of feedback and suggestions about how they could be better. The best part was getting individuals that they had good ideas and that exposing them to the group would make those ideas better.  The best moment was when one of the most quiet, most unassuming engineers got up and proceeded to unveil a completely awesome idea that was completely out of the box and completely powerful. At that point the energy in the room jacked up like a big wave.

After a while, work becomes work. We're lucky enough to be in a profession that requires as much creativity as it does precision. I wanted to put some meaning into what has become a term that is only applied with heavy irony.

We are early on in the process.  I am going to document how this first Innovation week goes -- expecting the unexpected, of course.

Right now, as noted, we are at the beginning. The management team has put a lot of work into setting up the idea generation, and we need to follow through by setting the teams up for success, picking the best ideas, then ruthlessly evangelizing those up the chain. It's a long journey but I think we made a great first step.