Sunday, November 17, 2013

Hadoop Streaming with MRJob

Motivation to use Streaming:

Writing java map-reduces for simple jobs feels like 95% boilerplate, 5% custom code. Streaming is a much simpler interface into Mapreduce, and it gives me the ability to tap into of the rich data processing, statistical analysis and nlp modules of Python.

Motivation to use mrjob:

While the interface to Hadoop Streaming couldn't be simpler, not all of my jobs are simple 'one and done' map-reduces, and most of them require custom options MRJob allows you to configure and run a single map and multiple reduces.  It also does some blocking and tackling, allowing me to customize arguments and passing them into specified jobs. Finally, mrjob can be applied to an on prem cluster or an amazon cluster - and we are looking at running amazon clusters for specific prototype use cases.

mrjob and streaming hurdles

The mrjob documentation is excellent for getting up and running with a simple job. I'm going to assume that you have read enough to know how to subclass MRJob, set up a map and a reduce function, and run it.

I'm going to discuss some of the things that weren't completely obvious to me after I had written my first job, or even my second job. Some of these things definitely made sense after I had read through the documentation, but it took multiple reads, some debug attempts on a live cluster, and some source code inspection.

Hurdle #1: passing arguments

My first job was basically a multi dimensional grep: I wanted to walk input data that had timestamp information  a tab delimited field and only process those lines that were in my specified date range.  In order to do this  I needed two range arguments that took date strings to do the range check in the mapper.  I also wanted to be able to apply specified regex patterns to those lines at map time.  Because there were several regex patterns,  I decided to put them in a file and parse them. So I needed to pass three arguments into my job, and those arguments were required for every mapper that got run in the cluster.

In order to pass arguments into my job, I had to override the configure_options() method of MRJob and use add_passthrough_option() for the range values, and add_file_option() for the file that held the regexes:

def configure_options(self):
        super(HDFSUsageByPathMatch,self).configure_options()
        self.add_passthrough_option("--startDateRange",type='string',help='...')
        self.add_passthrough_option("--endDateRange",type='string',help='')
        self.add_file_option("--filters")

All options were passed straight through to my job from the command-line:

python job.py --startDateRange 01/01/13 --endDateRange 12/01/13 --filters filters.json

I referenced them in an init function of my job class, which subclassed the MRJob class:

class MyJob:
     ...
    def task_init(self):
        self.startDateRange = dateutil.parser.parse(self.options.startDateRange)
        self.endDateRange = dateutil.parser.parse(self.options.endDateRange)
        self.filters = parseJsonOptions(self.options.filters)

This init method was specified in the MyJob.steps() override of the default MRJob method:

def steps(self):
        return [


            self.mr(mapper_init = self.task_init,
            .....
        ]

Something to note here: In the code I had written during development,  I had neglected to really read the documentation and as a result I had previously done all validation of my custom args using a standard OptParse class in my main handler. This worked for me in inline mode, which is what I was developing in. It does not work at all when running the job on a cluster, and it took some source code digging to figure out. Do as I say, not as I do :) In hadoop mode, the main MRJob script file is passed to mapper and reducer nodes with the step parameter set to the appropriate element in the steps array. The entry point into the script is the default main, and MRJob has a set of default parameters it needs to pass through to the MRJob subclassed job class. Overriding parameter handling in main effectively breaks MRJob when it tries to spawn mappers and reducers on worker nodes. MRJob handles the args for you, and you need to let it handle all arg parsing, and pass custom arguments as passthrough or file options. 

Hurdle #2: passing python modules

This nuance has more to do with streaming than it does with mrjob. But it's worth understanding if you're going to leverage non-standard Python modules in your mapper or reducer code, and those modules have not been installed on all of your datanodes.

I was using the dateutil class because it makes parsing dates from strings super easy. On a single node, getting dateutil up and running is this hard:

easy_install python-dateutil

But when you're running a streaming job on a cluster, that isn't an option. Or, it wasn't an option for me because the ops team didn't give me sudoers permissions on the cluster nodes, and even if they did, I would have had to write the install script to ssh in, do the install, and roll back on error. Arrgh, too hard.

What worked for me was to
  1. Download  the source code
  2. Zip it up (it arrived in tar.gz)
  3. Change the extension of the zip file because files that end in .zip are automatically moved to the lib folder of the task's working directory
  4. Access  it from within my script by putting it into the load path: 
sys.path.insert(0,'dateutil.mod/dateutil')
import dateutil
...

I'm passing dateutil.mod as a file passed in via add_file_option() in  myjob.configure_options(). Leveraging the add_file_option() method puts dateutil.mod in the local hadoop job's working directory:

def configure_options(self):
        super(HDFSUsageByPathMatch,self).configure_options()
        ....
        self.add_file_option("--dateutil")

Three things to note from the above code: (1) dateutil.mod is the zip file, (2) I'm referencing a module within the zip file by it's path location in that zipfile, and (3) because I've renamed the file, it gets placed in the job working directory, which means it is on my path by default. 

This is how I pass dateutil.mod into the job:

python job.py ... --dateutil dateutil.mod

Hurdle #3 (not quite cleared): chaining reduces vs map-reduces

As mentioned in the doc, it's super easy to chain reduces to do successive filtering and processing. Simply specify your multiple reduces in the steps() override:

def steps(self):
        return [
            self.mr(mapper_init = self.task_init,
                mapper=self.mapper_filter_matches,
                combiner=self.combiner_sum_usage,
                reducer=self.reducer_sum_usage),
            self.mr(reducer_init = self.task_init,
                    reducer=self.reducer_filter_keys)

        ]    

I haven't found it necessary to run successive mapreduces -- successive reduces work just as well in the use cases I've tried. When chaining reduces to the end of your first mapreduce, you can specify the key value from the first mapreduce as the key value in the next reduce.

What is not easy at this time is the ability to save intermediate output to a non intermediate location. While doing that is relatively straightforward in 'inline' mode, the approach suggested in the link won't work in hadoop mode because MRJob is invoking the python script with the right --step-num argument based on what it sees in the steps() method.

I did read about the --cleanup option, but from what I understand the intermediate output dir of a complex job is based on a naming convention, not on something I can set. As this is somewhat of an edge case, I can work around it by chaining MRJob runs with Oozie.

Summary

What I've learned about MRJob is that while it does a great job of allowing you to set and pass options, and allows you to construct good workflows (assuming you don't care about intermediate output), it is so easy to use that I fell into the trap of believing that running local on my machine was equivalent to running on a hadoop cluster.

As I've found out several times above, that is not the case. For me the keys here are (1) let MRJob handle your job specific variables, (2) leverage the steps() method for your more complex flows, and (3) if you need to save intermediate output, chain your jobs using an external scheduler.

6 comments:

  1. I recently came across your blog on hadoop and have been reading along. I thought I would leave my first comment. I don’t know what to say except that I have enjoyed reading. Nice blog. I will keep visiting this blog very often.
    Hadoop Training in hyderabad

    ReplyDelete
  2. Thank you so much for sharing this great information. Today I stand as a successful hadoop certified professional. Thanks to Big Data Training

    ReplyDelete
  3. The information you have posted here is really useful and interesting too & here, I had a chance to gather some useful tactics in programming, thanks for sharing and I have an expectation about your future blogs keep your updates please.

    JAVA Training in Chennai | JAVA Training Institutes in Chennai

    ReplyDelete
  4. Hi admin thanks for sharing informative article on hadoop technology. In coming years, hadoop and big data handling is going to be future of computing world. This field offer huge career prospects for talented professionals. Thus, taking Hadoop Training in Chennai will help you to enter big data technology.

    ReplyDelete
  5. I am following your blog from the beginning, it was so distinct & I had a chance to collect conglomeration of information that helps me a lot to improvise myself, Thanks for sharing...
    Regards,
    ccna course in Chennai|ccna training in Chennai|ccna training institute in Chennai

    ReplyDelete
  6. All are saying the same thing repeatedly, but in your blog I had a chance to get some useful and unique information, I love your writing style very much, I would like to suggest your blog in my dude circle, so keep on updates…
    Regards
    Angularjs training in chennai|Angularjs training chennai|Angularjs course in chennai

    ReplyDelete