Tuesday, May 15, 2012

Calculating Conditional Probability via Mapreduce part 2: group by userID

In my last post I walked through the basics of conditional probability in order to get around to actually discussing how I would implement it using mapreduce.

The whole calculation of Conditional Probability is a little contrived -- I admitted that up front -- my goal was to provide a 'non word count' example of mapreduce.  It also requires multiple mapreduce steps. I'll  provide the context around those steps to show how they link together to  calculate conditional probability and correlation, both of which are required if we are going to recommend or avoid recommending products when a specific product is shown.

The entire source for all samples (as I write them) can be found at:

https://github.com/arunxarun/cpsamples


Starting Assumptions: the User View Log.
Log format is dependent on who wrote the application, but if you were using something like Common Log Format, you would see the following kind of log:

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /products/p1234.html HTTP/1.0" 200 2326

where:

  1. frank is the userid
  2. The timestamp is the bracketed next field
  3. The quoted string afterword contains the HTTP verb plus the uri plus the protocol version
  4. The next field contains the HTTP status code
  5. The final field denotes the amount of time in milliseconds that the request took.

Mapreduce 1: group by UserID
The first mapreduce will group all pages by user during the map phase and sum page counts by user during the reduce phase. It does this by parsing the input lines and emitting the user and URI as a pair into the context:
    
/**
 * the key the user, the value is the URI
 *
 */
public static class GroupViewsByUserMapper extends Mapper {
 
  @Override
  public void map(final LongWritable key, final Text value,  final Context context) {
    // assume common log format: 
    //127.0.0.1\t-\tfrank\t[10/Oct/2000:13:55:36 -0700]\t"GET /apache_pb.gif HTTP/1.0"\t200\t2326\n
    String raw = value.toString();
  
    String fields[] = raw.split("\t");
  
    if(fields.length < 6 || fields.length > 7) {
        context.getCounter(GroupByUserCounters.BAD_LOG_FORMAT).increment(1);
        return;
    }
  
    String userIdText = fields[2];
    String rawHttpInfo = fields[4];
   
    String uriComponents[] = rawHttpInfo.split(" ");
  
    if(uriComponents.length < 3) {
         context.getCounter(GroupByUserCounters.BAD_URI_SUBFORMAT).increment(1);
         return;
    }
    Text userId = new Text(userIdText);
    Text uri = new Text(uriComponents[1]);

    try {
       context.write(userId, uri);
    } catch (IOException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
            .increment(1);
    } catch (InterruptedException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
            .increment(1);
    }
  
  }
}    


the reduce phase will aggregate counts per URI for each user into a text based map. Note that while Sequence Files are the more optimal way to write this information, we're using Text output to minimize complexity.

user1\t/pages/foo.html:2\t/pages/bar.html:1\t/pages/baz.html:1

/**
 * emit user: uri1:count1...uriN:countN 
 *
 */
public static class GroupViewsByUserReducer extends Reducer {
 
   
 
  /**
   *
   */
  @Override
  public void reduce(Text key, Iterable input, Context context) {
  
    Map URItoCount = new HashMap();
    Iterator it   = input.iterator();
  
    // aggregate views by uri.
    while(it.hasNext()) {
      Text value =  new Text(it.next());
   
      Integer count = URItoCount.get(value);
   
      if(count == null) {
        URItoCount.put(value, Integer.valueOf(1));
      } else {
        URItoCount.put(value, Integer.valueOf(count+1));
      }
   
    }
  
    // emit view:count format, tab delimited.
  
    StringBuffer sbuf = new StringBuffer();
  
    for(Map.Entry entry : URItoCount.entrySet()) {
      sbuf.append(entry.getKey().toString()).append(":").append(entry.getValue());
      sbuf.append("\t");
    }
  
    // convert list to hadoop Text
    String viewBuffer = sbuf.toString();
    LOGGER.debug("reduce: khttps://github.com/arunxarun/cpsamples/blob/master/src/main/java/org/arunxarun/mapreduce/sample/cp/GroupViewsByUser.javay = "+key.toString()+", value = "+viewBuffer);
    Text allViews = new Text(viewBuffer);
  
    try {
      context.write(key, allViews);
    } catch (IOException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_IO_EXCEPTION)
         .increment(1);
    } catch (InterruptedException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
         .increment(1);
   
    }
  }
}

A Final Note: Unit Testing
I'd like to point out some quick helper classes that  I used to aid my unit testing -- these are at

https://github.com/arunxarun/cpsamples/tree/master/src/main/java/org/arunxarun/mapreduce/unitmocks.

I've started to use these (and/or variants of these) to quickly validate that my mappers and reducers are emitting what I think they should: the bolded lines below allow me to retrieve mapped values and validate them at map time:


        @Test
 public void testMapperValidInput() throws IOException, InterruptedException {

  GroupViewsByUser.GroupViewsByUserMapper mapper = new GroupViewsByUser.GroupViewsByUserMapper();

  MockRecordWriter rw = new MockRecordWriter();
  Mapper.Context context = getMapperContext(mapper,rw);


  LongWritable key = new LongWritable(1);
  Text value = new Text("127.0.0.1\t-\tfrank\t[10/Oct/2012:13:55:36 -0700]\t\"GET /products/prod2.html HTTP/1.0\"\t200\t1002\n");
  mapper.map(key, value, context);
  LongWritable key2 = new LongWritable(2);
  Text value2 = new Text("127.0.0.1\t-\tjoe\t[10/Oct/2012:14:35:22 -0700]\t\"GET /products/prod1.html HTTP/1.0\"\t200\t2326\n");
  mapper.map(key2, value2, context);
  Map map = rw.getMap();
  assertNotNull(map);
  assertTrue(map.size() ==  2);
  assertNotNull(map.get(new Text("joe")));
  assertNotNull(map.get(new Text("frank")));



 }


The getMapperContext() method uses most of these stubs (that return empty values) when it returns a context object that can be passed into the mapper.

        /**
  * return a valid mapper context.
  * 
  * @param mapper
  * @param rw
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Mapper.Context getMapperContext(
   Mapper mapper,
   MockRecordWriter rw) throws IOException,
   InterruptedException {
  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID();

  Mapper.Context context = mapper.new Context(
    conf, taskId, new MockRecordReader(), rw,
    new MockOutputCommitter(), new MockStatusReporter(),
    new MockInputSplit());

  return context;

 }

The reducer logic is also testable:


        @Test
 public void testReducerValidInput() throws IOException,
   InterruptedException {

  GroupViewsByUser.GroupViewsByUserReducer reducer = 
                  new GroupViewsByUser.GroupViewsByUserReducer();

  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID("foo", 1, true, 1, 12);
  MockRecordWriter dwr = new MockRecordWriter();

  Reducer.Context context = getReducerContext(reducer,dwr,conf,taskId); 


  Iterable input = new Iterable() {

   @Override
   public Iterator iterator() {
    List list = new ArrayList();

    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));

    return list.iterator();
   }

  };

  Text key = new Text("userkey");

  reducer.reduce(key, input, context);

  Map map = dwr.getMap();

  assertNotNull(map);
  assertTrue(map.size() == 1);
  assertNotNull(map.get(new Text("userkey")));
  Text value = map.get(key);
  assertEquals("http://foobar.com:4\t", value.toString());

 }


again, the getReducerContext() returns a mocked up context that is passed into the reducer and evaluated. The MockRecordWriter can be interrogated for results. Here is that getReducerContext():


  * 
  * return a valid reducer context
  * @param reducer
  * @param dwr
  * @param conf
  * @param id
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Reducer.Context getReducerContext(
   Reducer reducer, 
   MockRecordWriter dwr,
   Configuration conf, 
   TaskAttemptID id) throws IOException, 
   InterruptedException {

  MockOutputCommitter doc = new MockOutputCommitter();
  MockStatusReporter dsr = new MockStatusReporter();
  MockRawKeyValueIterator drkv = new MockRawKeyValueIterator();


  Reducer.Context context = reducer.new Context(
  conf, id, drkv, new MockCounter(), new MockCounter(), dwr,
  doc, dsr, null, Text.class, Text.class);

  return context;
 }

In my next post I'll discuss how to take the output of this job and generate item to item mappings.