Friday, November 16, 2012

What is an Enterprise Data Platform, Anyway?

I've been trying to write about my requirements for a Data Platform for the last month. The problem is that I was trying to write this at the exact same time that my understanding of both the requirements and the Platform was shifting, so  I ended up writing the requirements, coming back to them a week later, throwing them all out, and rewriting them again. This is revision number three, and I actually feel that I've stabilized my definition. The biggest change between now and a month ago is that I realized that I am actually thinking about an Enterprise Data Platform, which is different than a Data Platform.

First of all: What is a Data Platform? Here is a first crack at a definition: a Data Platform enables people to leverage data by facilitating data collection, storage, transformation, and access. In other words, I need to be able to get data in, persist it, access it, change it, and still access it. All of that is completely possible with {name your nosql storage engine of choice}. So why am I not calling {name your nosql storage engine of choice} an Enterprise Data Platform?

I have a short answer that is explainable with a longer answer. The short answer: Enterprise Data Platforms have another requirement: the ability to manage all of that data. 

The long answer: my experiences with databases and persistence technology started from the perspective of a developer in a startup, not an administrator or data modeler in an enterprise. I wanted to successfully collect, store, transform, and retrieve data, in order to fulfill the functional requirements of the applications I wrote. I found that a lot of the database admins and data modelers/architects I worked with actually got in the way of this fairly simple mission.

A lot of the constructs and restrictions imposed on the data model and the database by both parties seemed arbitrary, and when I asked them for their reasoning, their answers had no relevance to my single data set use case.  So, when I discovered Hadoop while working around a complex set of ETLs that had ground our database to a halt in 2009, I was elated, because both parties were sidelined and I could do what I needed to do. Their apoplectic reactions to 'schema on read' used to fill my heart with joy.  That makes my current perspective very ironic.

The change in perspective happened when I changed jobs. I went from being a developer that used Hadoop to deliver solutions to an architect/product manager position where I led a team that operationalized Hadoop, Cassandra, Mongo, and Solr and then started to store and curate enterprise level data as a centralized service offering. The moment I started doing thinking past pure functionality and more about operations, I started to care about a lot of the things that people who manage data -- the people I used to disparage -- have been caring about for a while.  The moment I became a data curator or steward of a set of very diverse and valuable data, I started to see real value in Data Governance, which was a term that previously made me roll my eyes in impatient disgust.

Data Governance is defined in wikipedia as a set of processes around "data quality, data management, data policies, business process management, and risk management surrounding the handling of data in an organization".  As a developer in a startup tasked with doing a few things very well, I could care less about data governance. I know my data, it may be vast but it has the same schema, which I know and can code for.

However, as a member of a large enterprise storing many kinds of data who wants to use some of that data, I now must rely on data quality, definition, and security. Without those three I may be able to generate some value from that data, but the value is undermined by the lack of defined quality of the inputs, the lack of defined standards to normalize the data to, and the lack of security which means that the data could be compromised by a rogue user/process. Those 'constraints' on enterprise data are in place because they have a direct impact on the bottom line.

Breaking down Data Governance from the above definition, I get the following:
  1. Data resources must be discoverable: there must be a set of defined metadata that analysts can search for data by. 
    • common metadata is most effective when there is a common data model, which becomes hard to enforce when an enterprise spans many different business units. 
  2. Data structure must be describable so that analysts can consume it. 
  3. Data Quality must be known and identified every time new data is ingested. 
    • When I load data from an external source, downstream processes rely on it conforming to a schema. I need to score the data by how much of it conforms to that schema.
    • When Data Quality dips below a defined level, it must be treated as an operational issue. 
  4. Data Replication must be defined and enforceable.
    • Storage systems must allow users to set a replication factor to account for storage failures. 
  5. Data Replication must be defined, enforceable, and applicable per unique data type.
    • Replicating full data sets may be a business requirement: if so, a Data Platform must replicate data along the following dimensions: resource location (where) and range (how much). 
  6. The context of the data, e.g. what the units of the data are, must be defined in order for an analyst to successfully trust and consume the data. 
    • Unitless data is much less useful. Note that this implies common units across an enterprise, which is harder to do than one would think. Defining and encouraging a Master Data Model helps restrict units and meaning to allow different people to utilize data that they haven't produced. 
    • Data that has been vetted is 'trusted' which means that someone is standing behind it. Standards around trust are important when you have many owners of data. They imply standards around data quality and context. 
  7. Data must be secure
    • Access must be restricted to specific owners. 
    • Those owners must be able to centrally manage permission granting to share data with other users. 
    • Users must be authenticated to the overall system, and can only access data that the data owners have authorized them to. 
    • All actions must be audited.
These requirements are foundational to a data platform that houses many diverse data sets. You could build one without them, but it would only work for a limited set of data. Which is why startups don't care about data governance, and why most NoSQL products are only now starting to think about security. They don't need to -- and to be clear, that's perfectly fine. But that doesn't work or scale at the enterprise level.

What would an Enterprise Data Platform that implemented the requirements above enable? It would enable qualified, standardized, and secure use of data for both analytics and runtime consumption. As a company ran different kinds of analytic efforts on it's data, and generated runtime models from those analytic efforts, other analysts could reuse the input, intermediate, and generated data in different, unanticipated, "recombinant" ways because they can trust the data and can apply company standards to it. The platform would become an enabler, allowing analysts to discover new insights by removing the overhead of managing the data.

I have other requirements of a Data Management Platform besides management, but management is a foundational aspect when managing very many, very diverse sets of data across multiple storage platforms. I hope to start addressing those shortly.









Thursday, July 12, 2012

Calculating Conditional Probability with MapReduce part 4: Applying Correlation and Independence to get better results

In my last post I discussed using mapreduce to calculate item to item conditional probabilities, in other words, the probability that you will click on page B given that you have clicked on page A.

In this post I'm going to discuss how to determining whether a user click on page A is independent of a user click on page B. I went over this in the first post, but it's worth revisiting because taking the theory literally leads to confusion about why it is relevant for recommending products or content.

Independence and Correlation

Some review of the theory using common sense: if I roll two dice together, it's obvious that the probability of getting a 6 on the first dice is independent of the probability of getting a 6 on the second dice. If, on the other hand, I have 4 blue balls and 4 red balls in a drawer, and retrieved a blue ball the first time I reached for a ball, the probability of pulling out a red ball has gone up because now there are 3 blue balls and 4 red balls left. In the first scenario,

P(roll a 6 with dice A) | P(roll a 6 with dice B) = P(roll a 6 with dice A)

in the second: 

P(red ball) | P(blue ball) = # of red balls/(# of red balls + # of blue balls left). 

Given that I've reduced the number of blue balls, P(red | blue) is > P(red in the original case), I'm more likely to pick a red ball if I've picked a blue ball first.

How does that translate into whether to recommend page B given the user has viewed page A? URIs  aren't balls in a drawer. The theory of independent events vs dependent events doesn't really apply to page views...until you consider that the implications of using the independence test actually make sense when deciding whether to recommend page B given the user is viewing page A.

Given that we know P(B | A) and P(B), and we know that 
  1. for independent events P(B | A) = P(B)
  2. for negatively correlated events P(B | A) < P(B)
then any calculated  P(B | A) significantly <= P(B) means that recommending page B given the user is on page A is likely to disengage the user, because many more viewers are likely to view page B without viewing page A than they are to view page B after viewing  page A. 

Calculating Probabilities Again
From above, we need to calculate P(B | A) and P(B) to determine if B should be recommended when the user is viewing A. We have P(B | A) from the last post: the output of our last reducer was 

    uriX     uriY=P(Y | X) uriZ=P(Z | X)...


As for calculating raw probabilities, we can do this by taking the output from our group-by-user mapreduce from the second post, which looked like

   user1  pageA:num_views_A_by_user1  pageB:num_views_B_by_user1...

A map job that submitted key:value pairs from each line of that output, and a reduce job that grouped by (page uri) key and summed the counts would give a total count of page views per page.

public static class AggregateViewCountsMapper extends
   Mapper {

    @Override
    public void map(final LongWritable key, final Text value,
    final Context context) {
 // assume output of GroupViewsByUser:
 // user1\turi1:count1\turi2:count2...\turiN:countN
   
 String raw = value.toString();

 String fields[] = raw.split("\t");
   
 long totalCount = 0;
 for(int i = 1; i < fields.length; i++ ) {
   
         String rawFieldData = fields[i];
         String tokens[] = rawFieldData.split(":");
    
  if(tokens.length != 2) {
  context.getCounter(AggregateViewCounters.BAD_URI_SUBFORMAT).increment(1);
  continue;
  }
  // we care about both tokens
    
  try {
          String count = tokens[1].replaceAll("\\s","");
   Long actualCount = new Long(count);
   context.write(new Text(tokens[0]), new LongWritable(actualCount));
   totalCount += actualCount;
  } catch (IOException e) {
   context.getCounter(
    AggregateViewCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
     .increment(1);
    e.printStackTrace();
   } catch (InterruptedException e) {
    context.getCounter(
    AggregateViewCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
       .increment(1);
    e.printStackTrace();
    }
   }
   
   // write total counts to a guaranteed unique key
   try {
    context.write(new Text(UNIQUE_KEY), 
                                      new LongWritable(totalCount));
   } catch (IOException e) {
    context.getCounter(AggregateViewCounters.
                                     MAP_CONTEXT_WRITE_IO_EXCEPTION)
         .increment(1);
    e.printStackTrace();
   } catch (InterruptedException e) {
    context.getCounter(AggregateViewCounters.
                                     MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
         .increment(1);
    e.printStackTrace();
   }
   

  }
 }
    }
}


A special key (UNIQUE_KEY above) could be used to sum up all views per user.

The reducer is a simple aggregator, the final output would look like this:

pageA  total_view_A_ct
pageB  total_view_B_ct
....
UNIQUE_KEY total_view_all_pages_ct

The number of lines of output is equal to the total number of pages in your candidate set+1.

Notes on Final Calculations

I'm leaving this one as a mental exercise:
If the data set is small (<10k pages), you could write some script to calculate probabilities of pages A..N by dividing 

total views of pageA / pageTotal count
..
total views of pageX / pageTotal count

with those counts in place, you can now build a model of views and keep each item B if 

   P(B | A) > P(B)

Note that building the model consists of getting the item:item output, and filtering items as above.

If the data set is larger, and the overall size of the set spans several blocks, then processing the results via mapreduce becomes cumbersome if you don't know the total view count of all pages. I think that this last step definitely requires storing the raw probabilities in a lookup table, then processing the conditional probabilities and filtering out the lists by doing the comparision between P(B | A) and P(B).

How to build the model and store the data above for a more optimal retrieval during model build time is definitely a completely new blog post :)  

Sunday, June 10, 2012

Calculating Conditional Probability via Mapreduce part 3: item to item mappings

In my last post I gave an example of code that grouped views by user ID. In this post I'm going to show how to take the output of that mapreduce and generate item to item mappings that show the conditional probability of URIs being clicked on, given that a specific URI was clicked on.

The output from the views-by-user-ID reducer had the key, followed by a colon, followed by tab delimited URIs and counts (colon separated).

user1\turiX:3\turiY:4\turiZ:12....
user2\turiW:5\turiQ:5
...

We want to generate output that shows the following: given that the user has clicked on X, how many times did they click on Y vs Z, or P? In this output, we are not concerned with specific users, but user behavior in general. We want to sum up general conditional probabilities.

In the mapper, our input is as above: a more concrete example is :

user1\thttp://foo.com/page1:12\thttp://foo.com/page2:23\thttp://foo.com/page3:3

In the mapper we generate permutations of the different key pairs (and ignore the counts for now). In the example above we would generate:

key=>value
http://foo.com/page1=>http://foo.com/page2
http://foo.com/page1=>http://foo.com/page3
http://foo.com/page2=>http://foo.com/page1
http://foo.com/page2=>http://foo.com/page3
http://foo.com/page3=>http://foo.com/page1
http://foo.com/page3=>http://foo.com/page2

Here is the mapper code to generate the permutations
public static class ItemToItemGroupingMapper extends
   Mapper {

  @Override
  public void map(final LongWritable key, final Text value,
	final Context context) {
        // this job takes the input from GroupViewsByUser
	// user1\turiX:3\turiY:4\turiZ:12
	String raw = value.toString();
			
	String fields[] = raw.split("\t");

	if(fields.length == 1) {
		context.getCounter(ItemToItemCounters.BAD_MAP_INPUT_FORMAT)
		.increment(1);
		return;
	}
	// fields[0] is the user ID, we skip that for item-item co-occurrence.
	for(int i = 1; i < fields.length;i++) {
		// permute all combinations of co-occurrences
				
		for(int j = 1;j < fields.length;j++) {
			if(j == i) {
				continue; // dont count self co-occurrences
			}
			else {
		 	    String partsI[] = fields[i].split(GroupViewsByUser.INTERNAL_DELIMITER);
			    String partsJ[] = fields[j].split(GroupViewsByUser.INTERNAL_DELIMITER);
						
			    if(partsI.length != 2 || partsJ.length != 2) {
				context.getCounter(ItemToItemCounters.BAD_MAP_INPUT_FORMAT)
				.increment(1);
				return;
			}
		        try {
			    LOGGER.info(partsI[0]+"=>"+partsJ[0]);
			    context.write(new Text(partsI[0]),new Text(partsJ[0]));
			} catch (IOException e) {
			    context.getCounter(ItemToItemCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
				.increment(1);
			    e.printStackTrace();
			} catch (InterruptedException e) {
			    context.getCounter(ItemToItemCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
				.increment(1);
			    e.printStackTrace();
			}
		}
					
					
	}
  }
}

In the reducer, these pairs would show up and we can aggregate the counts of each URI. Again, we're not factoring in a normalized weighting at map time. In other words, from the example above, we're not factoring in the 12 hits of page1 vs the 23 hits of page2 when doing co-occurrences, because, as noted above, we're ignoring counts and focusing on co-occurrences

We could do some calculations on these pairs at reduce time -- for a specfic URI X, we would sum total co-occurrences per unique URI Y and divide by total co-occurrences of all URIs to get a number that represented the probability of the total co-occurrences that X and Y made up. We could then reverse sort the URIs by co-occurrence probability to produce output like this for uriX:

uriX\turiY=P(Y | X)\turiZ=P(Z | X)


a real version would look like:

/foo/bar.html   /foo/baz.html=0.5       /foo/car.html=0.5

As expected, in the example above, the probabilities all sum to 1.0.

The reducer code to aggregate counts is below. Its more complex than the previous GroupViewsByUser code because of the probability calculation and reverse sorting.


public void reduce(Text key, Iterable input, Context context) {

        Map URItoCount = new HashMap();
	Iterator it = input.iterator();
	long totalCount = 0;
	// these are co-occurrences: sum them.
	while (it.hasNext()) {
		Text value = new Text(it.next());

		Integer count = URItoCount.get(value);

		if (count == null) {
			URItoCount.put(value, Integer.valueOf(1));
		} else {
			URItoCount.put(value, Integer.valueOf(count + 1));
		}

		totalCount++;
	}

	// emit tab delimited and from greatest to least
	// uri2=prob(uri2 | uri1) uri3=prob(uri3 | uri1)

	Map> byProb = new HashMap>();
	List sortedSet = new ArrayList();
			
	for (Map.Entry entry : URItoCount.entrySet()) {
		double probability = ((double)entry.getValue())/totalCount;
		List list = byProb.get(probability);
				
		if(list == null) {
			list = new ArrayList();
			byProb.put(probability,list);
			sortedSet.add(probability);
		}
		list.add(entry.getKey());
	}

	// sort from greatest to least.
	Collections.sort(sortedSet);  
	Collections.reverse(sortedSet);
			
	// emit uri:value where value = count / total 
	StringBuffer sbuf = new StringBuffer();
			
	for(double probability : sortedSet){
		List uris = byProb.get(probability);
				
		for(Text uri : uris) {				      
                    sbuf.append(uri.toString()).append(INTERNAL_DELIMITER).append(probability).append(FIELD_DELIMITER);
                }
	}
				
	// convert list to hadoop Text
	String coOccurrenceBuffer = sbuf.toString();
	LOGGER.debug("reduce: key = " + key.toString() + ", value = "
			+ coOccurrenceBuffer);

	Text allViews = new Text(coOccurrenceBuffer);

	try {
		context.write(key, allViews);
	} catch (IOException e) {
		LOGGER.error(e);
		context.getCounter(
				GroupByUserCounters.REDUCE_CONTEXT_WRITE_IO_EXCEPTION)
				.increment(1);
	} catch (InterruptedException e) {
		LOGGER.error(e);
		context.getCounter(
				GroupByUserCounters.REDUCE_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
				.increment(1);

	}
}


Are we done yet? Well....no. While it's nice to see conditional probabilities, they dont tell us anything about whether any two URIs are correlated. That's next. Sample code can be found at https://github.com/arunxarun/cpsamples.

Tuesday, May 15, 2012

Calculating Conditional Probability via Mapreduce part 2: group by userID

In my last post I walked through the basics of conditional probability in order to get around to actually discussing how I would implement it using mapreduce.

The whole calculation of Conditional Probability is a little contrived -- I admitted that up front -- my goal was to provide a 'non word count' example of mapreduce.  It also requires multiple mapreduce steps. I'll  provide the context around those steps to show how they link together to  calculate conditional probability and correlation, both of which are required if we are going to recommend or avoid recommending products when a specific product is shown.

The entire source for all samples (as I write them) can be found at:

https://github.com/arunxarun/cpsamples


Starting Assumptions: the User View Log.
Log format is dependent on who wrote the application, but if you were using something like Common Log Format, you would see the following kind of log:

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /products/p1234.html HTTP/1.0" 200 2326

where:

  1. frank is the userid
  2. The timestamp is the bracketed next field
  3. The quoted string afterword contains the HTTP verb plus the uri plus the protocol version
  4. The next field contains the HTTP status code
  5. The final field denotes the amount of time in milliseconds that the request took.

Mapreduce 1: group by UserID
The first mapreduce will group all pages by user during the map phase and sum page counts by user during the reduce phase. It does this by parsing the input lines and emitting the user and URI as a pair into the context:
    
/**
 * the key the user, the value is the URI
 *
 */
public static class GroupViewsByUserMapper extends Mapper {
 
  @Override
  public void map(final LongWritable key, final Text value,  final Context context) {
    // assume common log format: 
    //127.0.0.1\t-\tfrank\t[10/Oct/2000:13:55:36 -0700]\t"GET /apache_pb.gif HTTP/1.0"\t200\t2326\n
    String raw = value.toString();
  
    String fields[] = raw.split("\t");
  
    if(fields.length < 6 || fields.length > 7) {
        context.getCounter(GroupByUserCounters.BAD_LOG_FORMAT).increment(1);
        return;
    }
  
    String userIdText = fields[2];
    String rawHttpInfo = fields[4];
   
    String uriComponents[] = rawHttpInfo.split(" ");
  
    if(uriComponents.length < 3) {
         context.getCounter(GroupByUserCounters.BAD_URI_SUBFORMAT).increment(1);
         return;
    }
    Text userId = new Text(userIdText);
    Text uri = new Text(uriComponents[1]);

    try {
       context.write(userId, uri);
    } catch (IOException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
            .increment(1);
    } catch (InterruptedException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
            .increment(1);
    }
  
  }
}    


the reduce phase will aggregate counts per URI for each user into a text based map. Note that while Sequence Files are the more optimal way to write this information, we're using Text output to minimize complexity.

user1\t/pages/foo.html:2\t/pages/bar.html:1\t/pages/baz.html:1

/**
 * emit user: uri1:count1...uriN:countN 
 *
 */
public static class GroupViewsByUserReducer extends Reducer {
 
   
 
  /**
   *
   */
  @Override
  public void reduce(Text key, Iterable input, Context context) {
  
    Map URItoCount = new HashMap();
    Iterator it   = input.iterator();
  
    // aggregate views by uri.
    while(it.hasNext()) {
      Text value =  new Text(it.next());
   
      Integer count = URItoCount.get(value);
   
      if(count == null) {
        URItoCount.put(value, Integer.valueOf(1));
      } else {
        URItoCount.put(value, Integer.valueOf(count+1));
      }
   
    }
  
    // emit view:count format, tab delimited.
  
    StringBuffer sbuf = new StringBuffer();
  
    for(Map.Entry entry : URItoCount.entrySet()) {
      sbuf.append(entry.getKey().toString()).append(":").append(entry.getValue());
      sbuf.append("\t");
    }
  
    // convert list to hadoop Text
    String viewBuffer = sbuf.toString();
    LOGGER.debug("reduce: khttps://github.com/arunxarun/cpsamples/blob/master/src/main/java/org/arunxarun/mapreduce/sample/cp/GroupViewsByUser.javay = "+key.toString()+", value = "+viewBuffer);
    Text allViews = new Text(viewBuffer);
  
    try {
      context.write(key, allViews);
    } catch (IOException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_IO_EXCEPTION)
         .increment(1);
    } catch (InterruptedException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
         .increment(1);
   
    }
  }
}

A Final Note: Unit Testing
I'd like to point out some quick helper classes that  I used to aid my unit testing -- these are at

https://github.com/arunxarun/cpsamples/tree/master/src/main/java/org/arunxarun/mapreduce/unitmocks.

I've started to use these (and/or variants of these) to quickly validate that my mappers and reducers are emitting what I think they should: the bolded lines below allow me to retrieve mapped values and validate them at map time:


        @Test
 public void testMapperValidInput() throws IOException, InterruptedException {

  GroupViewsByUser.GroupViewsByUserMapper mapper = new GroupViewsByUser.GroupViewsByUserMapper();

  MockRecordWriter rw = new MockRecordWriter();
  Mapper.Context context = getMapperContext(mapper,rw);


  LongWritable key = new LongWritable(1);
  Text value = new Text("127.0.0.1\t-\tfrank\t[10/Oct/2012:13:55:36 -0700]\t\"GET /products/prod2.html HTTP/1.0\"\t200\t1002\n");
  mapper.map(key, value, context);
  LongWritable key2 = new LongWritable(2);
  Text value2 = new Text("127.0.0.1\t-\tjoe\t[10/Oct/2012:14:35:22 -0700]\t\"GET /products/prod1.html HTTP/1.0\"\t200\t2326\n");
  mapper.map(key2, value2, context);
  Map map = rw.getMap();
  assertNotNull(map);
  assertTrue(map.size() ==  2);
  assertNotNull(map.get(new Text("joe")));
  assertNotNull(map.get(new Text("frank")));



 }


The getMapperContext() method uses most of these stubs (that return empty values) when it returns a context object that can be passed into the mapper.

        /**
  * return a valid mapper context.
  * 
  * @param mapper
  * @param rw
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Mapper.Context getMapperContext(
   Mapper mapper,
   MockRecordWriter rw) throws IOException,
   InterruptedException {
  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID();

  Mapper.Context context = mapper.new Context(
    conf, taskId, new MockRecordReader(), rw,
    new MockOutputCommitter(), new MockStatusReporter(),
    new MockInputSplit());

  return context;

 }

The reducer logic is also testable:


        @Test
 public void testReducerValidInput() throws IOException,
   InterruptedException {

  GroupViewsByUser.GroupViewsByUserReducer reducer = 
                  new GroupViewsByUser.GroupViewsByUserReducer();

  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID("foo", 1, true, 1, 12);
  MockRecordWriter dwr = new MockRecordWriter();

  Reducer.Context context = getReducerContext(reducer,dwr,conf,taskId); 


  Iterable input = new Iterable() {

   @Override
   public Iterator iterator() {
    List list = new ArrayList();

    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));

    return list.iterator();
   }

  };

  Text key = new Text("userkey");

  reducer.reduce(key, input, context);

  Map map = dwr.getMap();

  assertNotNull(map);
  assertTrue(map.size() == 1);
  assertNotNull(map.get(new Text("userkey")));
  Text value = map.get(key);
  assertEquals("http://foobar.com:4\t", value.toString());

 }


again, the getReducerContext() returns a mocked up context that is passed into the reducer and evaluated. The MockRecordWriter can be interrogated for results. Here is that getReducerContext():


  * 
  * return a valid reducer context
  * @param reducer
  * @param dwr
  * @param conf
  * @param id
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Reducer.Context getReducerContext(
   Reducer reducer, 
   MockRecordWriter dwr,
   Configuration conf, 
   TaskAttemptID id) throws IOException, 
   InterruptedException {

  MockOutputCommitter doc = new MockOutputCommitter();
  MockStatusReporter dsr = new MockStatusReporter();
  MockRawKeyValueIterator drkv = new MockRawKeyValueIterator();


  Reducer.Context context = reducer.new Context(
  conf, id, drkv, new MockCounter(), new MockCounter(), dwr,
  doc, dsr, null, Text.class, Text.class);

  return context;
 }

In my next post I'll discuss how to take the output of this job and generate item to item mappings. 

Sunday, March 18, 2012

Calculating Conditional Probability via Mapreduce Part 1: the theory

Motivation:

A colleague of mine was bemoaning the lack of non trivial mapreduce examples out there, and wanted to teach a class on how to apply mapreduce to "something other than word count".  This sounded like a great idea to me, so I volunteered to provide a mapreduce implementation of Conditional Probability (CP) calculations from user clickstream data.

CP calculation is one of many collaborative filtering techniques used to deliver personalized product/content recommendations to users at sites like Amazon, Pandora, Netflix, etc. Conditional Probability is the probability of one event happening once another has occurred. Doing this for page views can be used to determine which page views are correlated. Suggesting correlated pages to a user can greatly enhance their browsing and ultimately their purchasing experience.

For a web application, page correlation is possible to determine because user actions map to page views of products or content that are logged to the application servers. From those logs we can determine which pages are more likely to be viewed together by calculating potential correlations between all available pages.  For any large scale website, this means that GB of data per day need to be processed to calculate CP and correlation-- applying map-reduce in this case allows large amounts of log data to be processed in parallel.

Implementing CP  calculations via mapreduce is a great way to start to understand how collaborative filtering is done at scale -- it is initially non intuitive, but ends up being a great way to understand how mapreduce works. Of course, CP calculation is one part of a larger set of steps, one of which must be determining page correlation once CP has been calculated.

For me, it was necessary to understand Conditional Probability in order to correctly implement the CP calculation as well as understand how to use it to determine whether events are correlated.  And that in turn requires a basic understanding of Probability. Note: my interpretation of basic probability concepts is solely mine, and I welcome all comments and corrections :)

Probability Overview:

The probability that an event occurs can be stated as P(A) = (total count of event A)/(total count of all events). For example, if I say there is a 5% probability that I will introduce a bug into our source code, that means that 5 out of every 100 commits I make contain bugs (people on my team would tend to round that number upward, but they only have anecdotal evidence so far:)

At this point it's useful to draw a picture. The Probability Space for a set of potential events is the representation of all possible events that can happen, with their assigned probabilities, expressed as P(Event).



In the Venn Diagram above, the intersection of P(A) and P(B) is shown in purple, and is expressed as  A  B. This is the probability space where both A and B occur, aka the intersection of P(A) and P(B). The complete space covered by A and B is expressed as A ∪ B aka the union of P(A) and P(B).

If two events are mutually exclusive, that means they can never occur together. In the above example, Event C is mutually exclusive with both A and B.

Independent events are never mutually exclusive events. Since independent events by definition may occur together, they cannot be mutually exclusive. For example: I can wear a blue shirt and/or commit a bug. These two events may be independent. They are not mutually exclusive.  I cannot wear a blue shirt and wear no shirt at the same time. Those two events are mutually exclusive.

So events that can co-occur may be independent -- in other words they may not be correlated. If A and B are independent, then the probability that one happens does not depend on the probability that the other.  They may also be dependent -- B may occur more frequently after A occurs. I'll discuss determining independence more after providing a definition of Conditional Probability.

Conditional Probability, Defined and Applied

Conditional Probability, the probability of B occurring when A has occurred, is written as P(B | A).  From the Venn Diagram above, we are explicitly interested in the B    A area -- the intersection of A and B -- but only when A occurs. This can be expressed as

P(B | A) = P(B ∩ A)/P(A)

A Venn Diagram doesn't really show the 'in between state' of A (occurred) and B (not occurred)  too well.  A better way to visualize CP is to use a Probability Tree. In this tree, each branch represents a set of probabilities  for a single event. Events are represented as nodes. Subsequent events are sub branched from the original event nodes. The sum of each set of branches that share a common origin node must be 1 (or 100%).


In the tree above, I'm using some new notation. A' is the complement of A, which means that P(A') is the probability that A will not happen. The complementary probability is accounted for in a probability tree where it is 'implied' in a venn diagram.

This tree shows the possible state map for A and B, in a specific order. If A occurs, there is a probability that B will or will not occur, noted by the conditional probabilities. The same set of possible B states is available if A doesn't occur. The probability tree makes it clear that conditional probabilities represent the state where A (or A') have occurred.

The tree above is incomplete because it doesn't assign actual probabilities to the states. Possible probabilities are shown below.

To get the actual probability of an event, you traverse the tree and multiply the values of branches traversed. This works because of the previous definitions. Remember that B occurring given A is represented as B ∩ A.  From the equation above,

P(B | A) = P(B ∩ A)/P(A)  ->
∩ A = P(A)*P(B | A)

P(A) = 25%, P(B | A) = 33%, P(BA) = 25%*33% =  8.25%

The probability tree above defined an event space with two events that co-occur. One thing it doesn't give us is P(B). But we can get P(B), assuming that A is the only other event in the space, by adding up end states. B can be expressed as P(BA) + P(BA'). If we traverse A->B to get P(BA), we get P(A)*P(B | A). If we traverse A'->B to get P(BA'), we get P(A')*P(B | A'). The sum of these states is the total probability of B happening in this event space:

P(B) = P(A)*P(B | A) + P(A')*P(B | A')
        = 25%*33% +75%*55% = 8.25% +41.25% = 49.5%.

Conditional Probability and Independent Events

All of that is neat (really!), but how do we tie it back to providing real value to our site?  We do this by realizing how CP helps determine whether an event is independent or not. If A and B are truly independent, then

P(B | A) = P(B).

In other words, B is independent from A when B happens regardless of whether A happens. An example of this is two consecutive rolls of a dice. The probability of getting a 1 on the second roll is not affected by the value obtained from the first roll.

Contrast this with two dependent variables. If I had 5 pairs of black socks in a drawer (B), and 5 pairs of white socks (W), and wanted to calculate the probability of getting a black pair P(B) when pulling a pair out of my drawer, it would originally be 50%, but would change because there would be one pair less of black or white socks after my first pull.

If P(B | A) = P(B), and

P(B | A) = P(B ∩ A)/P(A), then for independent events,

P(B) = P(B ∩ A)/P(A) because P(B | A) = P(B) and

P(B)*P(A) = P(B ∩ A)

So if B is independent from A, the probability of B happening after A is the same as the probability of B multiplied by the probability of A.

If P(B ∩ A) >  P(B)*P(A), the two are positively correlated -- there is a greater possibility of B occurring when A occurs than the two occurring independently.

If P(B ∩ A) <  P(B)*P(A), the two are negatively correlated -- there is less of a possibility of B occurring when A occurs than the two occurring independently.


Variable (in)dependence is something to consider when recommending products or content. If we recommend content that we think is correlated when it is (a) independent or (b)negatively correlated, that recommendation may drown out other recommendations that are actually positively correlated, but whose probabilities are much less than the independent or negatively correlated item.

Conclusion (So Far)

As discussed above, calculating CP is part of a larger effort to determine whether pages are correlated, independent, or anti-correlated. To do this for two events A and B it is necessary to know P(B | A), P(B), and P(A). In my next post I'll provide a high level algorithm to get these numbers and determine correlated page views from a generic serve log containing pages viewed by user ID.