Sunday, June 10, 2012

Calculating Conditional Probability via Mapreduce part 3: item to item mappings

In my last post I gave an example of code that grouped views by user ID. In this post I'm going to show how to take the output of that mapreduce and generate item to item mappings that show the conditional probability of URIs being clicked on, given that a specific URI was clicked on.

The output from the views-by-user-ID reducer had the key, followed by a colon, followed by tab delimited URIs and counts (colon separated).


We want to generate output that shows the following: given that the user has clicked on X, how many times did they click on Y vs Z, or P? In this output, we are not concerned with specific users, but user behavior in general. We want to sum up general conditional probabilities.

In the mapper, our input is as above: a more concrete example is :


In the mapper we generate permutations of the different key pairs (and ignore the counts for now). In the example above we would generate:


Here is the mapper code to generate the permutations
public static class ItemToItemGroupingMapper extends
   Mapper {

  public void map(final LongWritable key, final Text value,
	final Context context) {
        // this job takes the input from GroupViewsByUser
	// user1\turiX:3\turiY:4\turiZ:12
	String raw = value.toString();
	String fields[] = raw.split("\t");

	if(fields.length == 1) {
	// fields[0] is the user ID, we skip that for item-item co-occurrence.
	for(int i = 1; i < fields.length;i++) {
		// permute all combinations of co-occurrences
		for(int j = 1;j < fields.length;j++) {
			if(j == i) {
				continue; // dont count self co-occurrences
			else {
		 	    String partsI[] = fields[i].split(GroupViewsByUser.INTERNAL_DELIMITER);
			    String partsJ[] = fields[j].split(GroupViewsByUser.INTERNAL_DELIMITER);
			    if(partsI.length != 2 || partsJ.length != 2) {
		        try {[0]+"=>"+partsJ[0]);
			    context.write(new Text(partsI[0]),new Text(partsJ[0]));
			} catch (IOException e) {
			} catch (InterruptedException e) {

In the reducer, these pairs would show up and we can aggregate the counts of each URI. Again, we're not factoring in a normalized weighting at map time. In other words, from the example above, we're not factoring in the 12 hits of page1 vs the 23 hits of page2 when doing co-occurrences, because, as noted above, we're ignoring counts and focusing on co-occurrences

We could do some calculations on these pairs at reduce time -- for a specfic URI X, we would sum total co-occurrences per unique URI Y and divide by total co-occurrences of all URIs to get a number that represented the probability of the total co-occurrences that X and Y made up. We could then reverse sort the URIs by co-occurrence probability to produce output like this for uriX:

uriX\turiY=P(Y | X)\turiZ=P(Z | X)

a real version would look like:

/foo/bar.html   /foo/baz.html=0.5       /foo/car.html=0.5

As expected, in the example above, the probabilities all sum to 1.0.

The reducer code to aggregate counts is below. Its more complex than the previous GroupViewsByUser code because of the probability calculation and reverse sorting.

public void reduce(Text key, Iterable input, Context context) {

        Map URItoCount = new HashMap();
	Iterator it = input.iterator();
	long totalCount = 0;
	// these are co-occurrences: sum them.
	while (it.hasNext()) {
		Text value = new Text(;

		Integer count = URItoCount.get(value);

		if (count == null) {
			URItoCount.put(value, Integer.valueOf(1));
		} else {
			URItoCount.put(value, Integer.valueOf(count + 1));


	// emit tab delimited and from greatest to least
	// uri2=prob(uri2 | uri1) uri3=prob(uri3 | uri1)

	Map> byProb = new HashMap>();
	List sortedSet = new ArrayList();
	for (Map.Entry entry : URItoCount.entrySet()) {
		double probability = ((double)entry.getValue())/totalCount;
		List list = byProb.get(probability);
		if(list == null) {
			list = new ArrayList();

	// sort from greatest to least.
	// emit uri:value where value = count / total 
	StringBuffer sbuf = new StringBuffer();
	for(double probability : sortedSet){
		List uris = byProb.get(probability);
		for(Text uri : uris) {				      
	// convert list to hadoop Text
	String coOccurrenceBuffer = sbuf.toString();
	LOGGER.debug("reduce: key = " + key.toString() + ", value = "
			+ coOccurrenceBuffer);

	Text allViews = new Text(coOccurrenceBuffer);

	try {
		context.write(key, allViews);
	} catch (IOException e) {
	} catch (InterruptedException e) {


Are we done yet? While it's nice to see conditional probabilities, they dont tell us anything about whether any two URIs are correlated. That's next. Sample code can be found at

1 comment: