Friday, January 11, 2013

More on the Enterprise Data Platform: Data Requirements

In my last post, I talked in very general terms about an Enterprise Data Platform (EDP) and in very specific terms around what I consider to be a core requirement of any EDP, data governance. If I have a set of services and processes that provide data governance, I have a way to manage data. What kind of data am I trying to manage?

I'm primarily concerned with building systems that contain  event data and reference data. Event data can be data copied from OLTP systems, it can be user click streams, machine data collected at regular intervals --  anything that signals an event happening.  Event data can be huge.

Reference data is data that can be used to classify/quantify aspects of an event.  If I'm looking at a click stream, a user ID is reference data that I can aggregate events by. In OLAP terms, events can be cast as facts, with reference data providing some of the dimension values.

This data starts to get valuable when  'raw' event data is joined to reference data, and then in turn joined to other event data along a specific reference dimension. For example, aggregating user click stream and email campaign opens by user ID could be used to track the rate at which the email campaign actually generated new users.

In addition to that kind of analytical use, event data can be used to classify users and/or determine their affinities. This kind of derivative data is typically referenced at runtime, by the same applications that are generating the event data.

The two cases above are interesting because they have opposing requirements. Analytic data must first and foremost be consistent, especially when financial reporting and/or business decisions are being made from that data. Consistency in this case implies that when a value is written, the next read reflects the last change made to that variable.

Runtime data must first and foremost be available, because unavailability of data may compromise application behavior. Besides, preference or personalization data is derivative data, generated on a set interval as a batch process. Being out of sync for a long time will eventually mean the application will grow 'stale', but when we talk about availability at the expense of consistency, the usual case is that any inconsistency between read state and written state will be resolved in sub second time.

Why is this important?

  1. Enterprises collect a lot of this data -- TB/day -- and that scale will swamp any single machine based database. 
  2. The enterprise must therefore store data on a storage platform that spans many machines -- a cluster. 
  3. The moment data is stored in a cluster, it is subject to the CAP theorem, which states that a distributed system cannot enforce Consistency, Availability, and Partition Tolerance at the same time. 
  4. While it is valid to desire a clustered system that favors either Consistency or Availability, Enterprise level  requirements state always require partition tolerance, so we can have one or the other, but not both. 
  5. This means that there are usually two main systems, an analytic focused system, and a runtime facing system. The analytic focused system favors consistency, the runtime facing system favors availability.
  6. An EDP that wants to offer both analytic and runtime facing data must have at least two systems, one which is consistent for analytic data, the other which is available for runtime facing data.
So now my definition of an EDP has evolved. It must have some fundamental Data Governance, and due to the scale enterprises operate at, must have 1..N analytics focused platform(s) and 1..N runtime facing platform(s). In my next post I'll focus on the analytic focused platform requirements. 







Friday, November 16, 2012

What is an Enterprise Data Platform, Anyway?

I've been trying to write about my requirements for a Data Platform for the last month. The problem is that I was trying to write this at the exact same time that my understanding of both the requirements and the Platform was shifting, so  I ended up writing the requirements, coming back to them a week later, throwing them all out, and rewriting them again. This is revision number three, and I actually feel that I've stabilized my definition. The biggest change between now and a month ago is that I realized that I am actually thinking about an Enterprise Data Platform, which is different than a Data Platform.

First of all: What is a Data Platform? Here is a first crack at a definition: a Data Platform enables people to leverage data by facilitating data collection, storage, transformation, and access. In other words, I need to be able to get data in, persist it, access it, change it, and still access it. All of that is completely possible with {name your nosql storage engine of choice}. So why am I not calling {name your nosql storage engine of choice} an Enterprise Data Platform?

I have a short answer that is explainable with a longer answer. The short answer: Enterprise Data Platforms have another requirement: the ability to manage all of that data. 

The long answer: my experiences with databases and persistence technology started from the perspective of a developer in a startup, not an administrator or data modeler in an enterprise. I wanted to successfully collect, store, transform, and retrieve data, in order to fulfill the functional requirements of the applications I wrote. I found that a lot of the database admins and data modelers/architects I worked with actually got in the way of this fairly simple mission.

A lot of the constructs and restrictions imposed on the data model and the database by both parties seemed arbitrary, and when I asked them for their reasoning, their answers had no relevance to my single data set use case.  So, when I discovered Hadoop while working around a complex set of ETLs that had ground our database to a halt in 2007, I was elated, because both parties were sidelined and I could do what I needed to do. Their apoplectic reactions to 'schema on read' used to fill my heart with joy.  That makes my current perspective very ironic.

The change in perspective happened when I changed jobs. I went from being a developer that used Hadoop to deliver solutions to an architect/product manager position where I led a team that operationalized Hadoop, Cassandra, Mongo, and Solr and then started to store and curate enterprise level data as a centralized service offering. The moment I started doing thinking past pure functionality and more about operations, I started to care about a lot of the things that people who manage data -- the people I used to disparage -- have been caring about for a while.  The moment I became a data curator or steward of a set of very diverse and valuable data, I started to see real value in Data Governance, which was a term that previously made me roll my eyes in impatient disgust.

Data Governance is defined in wikipedia as a set of processes around "data quality, data management, data policies, business process management, and risk management surrounding the handling of data in an organization".  As a developer in a startup tasked with doing a few things very well, I could care less about data governance. I know my data, it may be vast but it has the same schema, which I know and can code for.

However, as a member of a large enterprise storing many kinds of data who wants to use some of that data, I now must rely on data quality, definition, and security. Without those three I may be able to generate some value from that data, but the value is undermined by the lack of defined quality of the inputs, the lack of defined standards to normalize the data to, and the lack of security which means that the data could be compromised by a rogue user/process. Those 'constraints' on enterprise data are in place because they have a direct impact on the bottom line.

Breaking down Data Governance from the above definition, I get the following:
  1. Data resources must be discoverable: there must be a set of defined metadata that analysts can search for data by. 
    • common metadata is most effective when there is a common data model, which becomes hard to enforce when an enterprise spans many different business units. 
  2. Data structure must be describable so that analysts can consume it. 
  3. Data Quality must be known and identified every time new data is ingested. 
    • When I load data from an external source, downstream processes rely on it conforming to a schema. I need to score the data by how much of it conforms to that schema.
    • When Data Quality dips below a defined level, it must be treated as an operational issue. 
  4. Data Replication must be defined and enforceable.
    • Storage systems must allow users to set a replication factor to account for storage failures. 
  5. Data Replication must be defined, enforceable, and applicable per unique data type.
    • Replicating full data sets may be a business requirement: if so, a Data Platform must replicate data along the following dimensions: resource location (where) and range (how much). 
  6. The context of the data, e.g. what the units of the data are, must be defined in order for an analyst to successfully trust and consume the data. 
    • Unitless data is much less useful. Note that this implies common units across an enterprise, which is harder to do than one would think. Defining and encouraging a Master Data Model helps restrict units and meaning to allow different people to utilize data that they haven't produced. 
    • Data that has been vetted is 'trusted' which means that someone is standing behind it. Standards around trust are important when you have many owners of data. They imply standards around data quality and context. 
  7. Data must be secure
    • Access must be restricted to specific owners. 
    • Those owners must be able to centrally manage permission granting to share data with other users. 
    • Users must be authenticated to the overall system, and can only access data that the data owners have authorized them to. 
    • All actions must be audited.
These requirements are foundational to a data platform that houses many diverse data sets. You could build one without them, but it would only work for a limited set of data. Which is why startups don't care about data governance, and why most NoSQL products are only now starting to think about security. They don't need to -- and to be clear, that's perfectly fine. But that doesn't work or scale at the enterprise level.

What would an Enterprise Data Platform that implemented the requirements above enable? It would enable qualified, standardized, and secure use of data for both analytics and runtime consumption. As a company ran different kinds of analytic efforts on it's data, and generated runtime models from those analytic efforts, other analysts could reuse the input, intermediate, and generated data in different, unanticipated, "recombinant" ways because they can trust the data and can apply company standards to it. The platform would become an enabler, allowing analysts to discover new insights by removing the overhead of managing the data.

I have other requirements of a Data Management Platform besides management, but management is a foundational aspect when managing very many, very diverse sets of data across multiple storage platforms. I hope to start addressing those shortly.









Thursday, July 12, 2012

Calculating Conditional Probability with MapReduce part 4: Applying Correlation and Independence to get better results

In my last post I discussed using mapreduce to calculate item to item conditional probabilities, in other words, the probability that you will click on page B given that you have clicked on page A.


In this post I'm going to discuss how to determining whether a user click on page A is independent of a user click on page B. I went over this in the first post, but it's worth revisiting because taking the theory literally leads to confusion about why it is relevant for recommending products or content.

Independence and Correlation

Some review of the theory using common sense: if I roll two dice together, it's obvious that the probability of getting a 6 on the first dice is independent of the probability of getting a 6 on the second dice. If, on the other hand, I have 4 blue balls and 4 red balls in a drawer, and retrieved a blue ball the first time I reached for a ball, the probability of pulling out a red ball has gone up because now there are 3 blue balls and 4 red balls left. In the first scenario,


P(roll a 6 with dice A) | P(roll a 6 with dice B) = P(roll a 6 with dice A)


in the second: 


P(red ball) | P(blue ball) = # of red balls/(# of red balls + # of blue balls left). 


Given that I've reduced the number of blue balls, P(red | blue) is > P(red in the original case), I'm more likely to pick a red ball if I've picked a blue ball first.


How does that translate into whether to recommend page B given the user has viewed page A? URIs  aren't balls in a drawer. The theory of independent events vs dependent events doesn't really apply to page views...until you consider that the implications of using the independence test actually make sense when deciding whether to recommend page B given the user is viewing page A.


Given that we know P(B | A) and P(B), and we know that 
  1. for independent events P(B | A) = P(B)
  2. for negatively correlated events P(B | A) < P(B)
then any calculated  P(B | A) significantly <= P(B) means that recommending page B given the user is on page A is likely to disengage the user, because many more viewers are likely to view page B without viewing page A than they are to view page B after viewing  page A. 


Calculating Probabilities Again
From above, we need to calculate P(B | A) and P(B) to determine if B should be recommended when the user is viewing A. We have P(B | A) from the last post: the output of our last reducer was 

    uriX     uriY=P(Y | X) uriZ=P(Z | X)...


As for calculating raw probabilities, we can do this by taking the output from our group-by-user mapreduce from the second post, which looked like

   user1  pageA:num_views_A_by_user1  pageB:num_views_B_by_user1...

A map job that submitted key:value pairs from each line of that output, and a reduce job that grouped by (page uri) key and summed the counts would give a total count of page views per page.


public static class AggregateViewCountsMapper extends
   Mapper {

    @Override
    public void map(final LongWritable key, final Text value,
    final Context context) {
 // assume output of GroupViewsByUser:
 // user1\turi1:count1\turi2:count2...\turiN:countN
   
 String raw = value.toString();

 String fields[] = raw.split("\t");
   
 long totalCount = 0;
 for(int i = 1; i < fields.length; i++ ) {
   
         String rawFieldData = fields[i];
         String tokens[] = rawFieldData.split(":");
    
  if(tokens.length != 2) {
  context.getCounter(AggregateViewCounters.BAD_URI_SUBFORMAT).increment(1);
  continue;
  }
  // we care about both tokens
    
  try {
          String count = tokens[1].replaceAll("\\s","");
   Long actualCount = new Long(count);
   context.write(new Text(tokens[0]), new LongWritable(actualCount));
   totalCount += actualCount;
  } catch (IOException e) {
   context.getCounter(
    AggregateViewCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
     .increment(1);
    e.printStackTrace();
   } catch (InterruptedException e) {
    context.getCounter(
    AggregateViewCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
       .increment(1);
    e.printStackTrace();
    }
   }
   
   // write total counts to a guaranteed unique key
   try {
    context.write(new Text(UNIQUE_KEY), 
                                      new LongWritable(totalCount));
   } catch (IOException e) {
    context.getCounter(AggregateViewCounters.
                                     MAP_CONTEXT_WRITE_IO_EXCEPTION)
         .increment(1);
    e.printStackTrace();
   } catch (InterruptedException e) {
    context.getCounter(AggregateViewCounters.
                                     MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
         .increment(1);
    e.printStackTrace();
   }
   

  }
 }
    }
}



A special key (UNIQUE_KEY above) could be used to sum up all views per user.


The reducer is a simple aggregator, the final output would look like this:

pageA  total_view_A_ct
pageB  total_view_B_ct
....
UNIQUE_KEY total_view_all_pages_ct

The number of lines of output is equal to the total number of pages in your candidate set+1.


Notes on Final Calculations

I'm leaving this one as a mental exercise:
If the data set is small (<10k pages), you could write some script to calculate probabilities of pages A..N by dividing 

total views of pageA / pageTotal count
..
total views of pageX / pageTotal count

with those counts in place, you can now build a model of views and keep each item B if 

   P(B | A) > P(B)

Note that building the model consists of getting the item:item output, and filtering items as above.


If the data set is larger, and the overall size of the set spans several blocks, then processing the results via mapreduce becomes cumbersome if you don't know the total view count of all pages. I think that this last step definitely requires storing the raw probabilities in a lookup table, then processing the conditional probabilities and filtering out the lists by doing the comparision between P(B | A) and P(B).


How to build the model and store the data above for a more optimal retrieval during model build time is definitely a completely new blog post :)  

Sunday, June 10, 2012

Calculating Conditional Probability via Mapreduce part 3: item to item mappings

In my last post I gave an example of code that grouped views by user ID. In this post I'm going to show how to take the output of that mapreduce and generate item to item mappings that show the conditional probability of URIs being clicked on, given that a specific URI was clicked on.

The output from the views-by-user-ID reducer had the key, followed by a colon, followed by tab delimited URIs and counts (colon separated).

user1\turiX:3\turiY:4\turiZ:12....
user2\turiW:5\turiQ:5
...

We want to generate output that shows the following: given that the user has clicked on X, how many times did they click on Y vs Z, or P? In this output, we are not concerned with specific users, but user behavior in general. We want to sum up general conditional probabilities.

In the mapper, our input is as above: a more concrete example is :

user1\thttp://foo.com/page1:12\thttp://foo.com/page2:23\thttp://foo.com/page3:3

In the mapper we generate permutations of the different key pairs (and ignore the counts for now). In the example above we would generate:

key=>value
http://foo.com/page1=>http://foo.com/page2
http://foo.com/page1=>http://foo.com/page3
http://foo.com/page2=>http://foo.com/page1
http://foo.com/page2=>http://foo.com/page3
http://foo.com/page3=>http://foo.com/page1
http://foo.com/page3=>http://foo.com/page2

Here is the mapper code to generate the permutations
public static class ItemToItemGroupingMapper extends
   Mapper {

  @Override
  public void map(final LongWritable key, final Text value,
	final Context context) {
        // this job takes the input from GroupViewsByUser
	// user1\turiX:3\turiY:4\turiZ:12
	String raw = value.toString();
			
	String fields[] = raw.split("\t");

	if(fields.length == 1) {
		context.getCounter(ItemToItemCounters.BAD_MAP_INPUT_FORMAT)
		.increment(1);
		return;
	}
	// fields[0] is the user ID, we skip that for item-item co-occurrence.
	for(int i = 1; i < fields.length;i++) {
		// permute all combinations of co-occurrences
				
		for(int j = 1;j < fields.length;j++) {
			if(j == i) {
				continue; // dont count self co-occurrences
			}
			else {
		 	    String partsI[] = fields[i].split(GroupViewsByUser.INTERNAL_DELIMITER);
			    String partsJ[] = fields[j].split(GroupViewsByUser.INTERNAL_DELIMITER);
						
			    if(partsI.length != 2 || partsJ.length != 2) {
				context.getCounter(ItemToItemCounters.BAD_MAP_INPUT_FORMAT)
				.increment(1);
				return;
			}
		        try {
			    LOGGER.info(partsI[0]+"=>"+partsJ[0]);
			    context.write(new Text(partsI[0]),new Text(partsJ[0]));
			} catch (IOException e) {
			    context.getCounter(ItemToItemCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
				.increment(1);
			    e.printStackTrace();
			} catch (InterruptedException e) {
			    context.getCounter(ItemToItemCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
				.increment(1);
			    e.printStackTrace();
			}
		}
					
					
	}
  }
}

In the reducer, these pairs would show up and we can aggregate the counts of each URI. Again, we're not factoring in a normalized weighting at map time. In other words, from the example above, we're not factoring in the 12 hits of page1 vs the 23 hits of page2 when doing co-occurrences, because, as noted above, we're ignoring counts and focusing on co-occurrences

We could do some calculations on these pairs at reduce time -- for a specfic URI X, we would sum total co-occurrences per unique URI Y and divide by total co-occurrences of all URIs to get a number that represented the probability of the total co-occurrences that X and Y made up. We could then reverse sort the URIs by co-occurrence probability to produce output like this for uriX:

uriX\turiY=P(Y | X)\turiZ=P(Z | X)


a real version would look like:

/foo/bar.html   /foo/baz.html=0.5       /foo/car.html=0.5

As expected, in the example above, the probabilities all sum to 1.0.

The reducer code to aggregate counts is below. Its more complex than the previous GroupViewsByUser code because of the probability calculation and reverse sorting.


public void reduce(Text key, Iterable input, Context context) {

        Map URItoCount = new HashMap();
	Iterator it = input.iterator();
	long totalCount = 0;
	// these are co-occurrences: sum them.
	while (it.hasNext()) {
		Text value = new Text(it.next());

		Integer count = URItoCount.get(value);

		if (count == null) {
			URItoCount.put(value, Integer.valueOf(1));
		} else {
			URItoCount.put(value, Integer.valueOf(count + 1));
		}

		totalCount++;
	}

	// emit tab delimited and from greatest to least
	// uri2=prob(uri2 | uri1) uri3=prob(uri3 | uri1)

	Map> byProb = new HashMap>();
	List sortedSet = new ArrayList();
			
	for (Map.Entry entry : URItoCount.entrySet()) {
		double probability = ((double)entry.getValue())/totalCount;
		List list = byProb.get(probability);
				
		if(list == null) {
			list = new ArrayList();
			byProb.put(probability,list);
			sortedSet.add(probability);
		}
		list.add(entry.getKey());
	}

	// sort from greatest to least.
	Collections.sort(sortedSet);  
	Collections.reverse(sortedSet);
			
	// emit uri:value where value = count / total 
	StringBuffer sbuf = new StringBuffer();
			
	for(double probability : sortedSet){
		List uris = byProb.get(probability);
				
		for(Text uri : uris) {				      
                    sbuf.append(uri.toString()).append(INTERNAL_DELIMITER).append(probability).append(FIELD_DELIMITER);
                }
	}
				
	// convert list to hadoop Text
	String coOccurrenceBuffer = sbuf.toString();
	LOGGER.debug("reduce: key = " + key.toString() + ", value = "
			+ coOccurrenceBuffer);

	Text allViews = new Text(coOccurrenceBuffer);

	try {
		context.write(key, allViews);
	} catch (IOException e) {
		LOGGER.error(e);
		context.getCounter(
				GroupByUserCounters.REDUCE_CONTEXT_WRITE_IO_EXCEPTION)
				.increment(1);
	} catch (InterruptedException e) {
		LOGGER.error(e);
		context.getCounter(
				GroupByUserCounters.REDUCE_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
				.increment(1);

	}
}


Are we done yet? Well....no. While it's nice to see conditional probabilities, they dont tell us anything about whether any two URIs are correlated. That's next. Sample code can be found at https://github.com/arunxarun/cpsamples.

Tuesday, May 15, 2012

Calculating Conditional Probability via Mapreduce part 2: group by userID

In my last post I walked through the basics of conditional probability in order to get around to actually discussing how I would implement it using mapreduce.

The whole calculation of Conditional Probability is a little contrived -- I admitted that up front -- my goal was to provide a 'non word count' example of mapreduce.  It also requires multiple mapreduce steps. I'll  provide the context around those steps to show how they link together to  calculate conditional probability and correlation, both of which are required if we are going to recommend or avoid recommending products when a specific product is shown.

The entire source for all samples (as I write them) can be found at:

https://github.com/arunxarun/cpsamples


Starting Assumptions: the User View Log.
Log format is dependent on who wrote the application, but if you were using something like Common Log Format, you would see the following kind of log:

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /products/p1234.html HTTP/1.0" 200 2326

where:

  1. frank is the userid
  2. The timestamp is the bracketed next field
  3. The quoted string afterword contains the HTTP verb plus the uri plus the protocol version
  4. The next field contains the HTTP status code
  5. The final field denotes the amount of time in milliseconds that the request took.

Mapreduce 1: group by UserID
The first mapreduce will group all pages by user during the map phase and sum page counts by user during the reduce phase. It does this by parsing the input lines and emitting the user and URI as a pair into the context:
    
/**
 * the key the user, the value is the URI
 *
 */
public static class GroupViewsByUserMapper extends Mapper {
 
  @Override
  public void map(final LongWritable key, final Text value,  final Context context) {
    // assume common log format: 
    //127.0.0.1\t-\tfrank\t[10/Oct/2000:13:55:36 -0700]\t"GET /apache_pb.gif HTTP/1.0"\t200\t2326\n
    String raw = value.toString();
  
    String fields[] = raw.split("\t");
  
    if(fields.length < 6 || fields.length > 7) {
        context.getCounter(GroupByUserCounters.BAD_LOG_FORMAT).increment(1);
        return;
    }
  
    String userIdText = fields[2];
    String rawHttpInfo = fields[4];
   
    String uriComponents[] = rawHttpInfo.split(" ");
  
    if(uriComponents.length < 3) {
         context.getCounter(GroupByUserCounters.BAD_URI_SUBFORMAT).increment(1);
         return;
    }
    Text userId = new Text(userIdText);
    Text uri = new Text(uriComponents[1]);

    try {
       context.write(userId, uri);
    } catch (IOException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_IO_EXCEPTION)
            .increment(1);
    } catch (InterruptedException e) {
         LOGGER.error(e);
         context.getCounter(GroupByUserCounters.MAP_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
            .increment(1);
    }
  
  }
}    


the reduce phase will aggregate counts per URI for each user into a text based map. Note that while Sequence Files are the more optimal way to write this information, we're using Text output to minimize complexity.

user1\t/pages/foo.html:2\t/pages/bar.html:1\t/pages/baz.html:1

/**
 * emit user: uri1:count1...uriN:countN 
 *
 */
public static class GroupViewsByUserReducer extends Reducer {
 
   
 
  /**
   *
   */
  @Override
  public void reduce(Text key, Iterable input, Context context) {
  
    Map URItoCount = new HashMap();
    Iterator it   = input.iterator();
  
    // aggregate views by uri.
    while(it.hasNext()) {
      Text value =  new Text(it.next());
   
      Integer count = URItoCount.get(value);
   
      if(count == null) {
        URItoCount.put(value, Integer.valueOf(1));
      } else {
        URItoCount.put(value, Integer.valueOf(count+1));
      }
   
    }
  
    // emit view:count format, tab delimited.
  
    StringBuffer sbuf = new StringBuffer();
  
    for(Map.Entry entry : URItoCount.entrySet()) {
      sbuf.append(entry.getKey().toString()).append(":").append(entry.getValue());
      sbuf.append("\t");
    }
  
    // convert list to hadoop Text
    String viewBuffer = sbuf.toString();
    LOGGER.debug("reduce: khttps://github.com/arunxarun/cpsamples/blob/master/src/main/java/org/arunxarun/mapreduce/sample/cp/GroupViewsByUser.javay = "+key.toString()+", value = "+viewBuffer);
    Text allViews = new Text(viewBuffer);
  
    try {
      context.write(key, allViews);
    } catch (IOException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_IO_EXCEPTION)
         .increment(1);
    } catch (InterruptedException e) {
      LOGGER.error(e);
      context.getCounter(GroupByUserCounters.REDUCE_CONTEXT_WRITE_INTERRUPTED_EXCEPTION)
         .increment(1);
   
    }
  }
}

A Final Note: Unit Testing
I'd like to point out some quick helper classes that  I used to aid my unit testing -- these are at

https://github.com/arunxarun/cpsamples/tree/master/src/main/java/org/arunxarun/mapreduce/unitmocks.

I've started to use these (and/or variants of these) to quickly validate that my mappers and reducers are emitting what I think they should: the bolded lines below allow me to retrieve mapped values and validate them at map time:


        @Test
 public void testMapperValidInput() throws IOException, InterruptedException {

  GroupViewsByUser.GroupViewsByUserMapper mapper = new GroupViewsByUser.GroupViewsByUserMapper();

  MockRecordWriter rw = new MockRecordWriter();
  Mapper.Context context = getMapperContext(mapper,rw);


  LongWritable key = new LongWritable(1);
  Text value = new Text("127.0.0.1\t-\tfrank\t[10/Oct/2012:13:55:36 -0700]\t\"GET /products/prod2.html HTTP/1.0\"\t200\t1002\n");
  mapper.map(key, value, context);
  LongWritable key2 = new LongWritable(2);
  Text value2 = new Text("127.0.0.1\t-\tjoe\t[10/Oct/2012:14:35:22 -0700]\t\"GET /products/prod1.html HTTP/1.0\"\t200\t2326\n");
  mapper.map(key2, value2, context);
  Map map = rw.getMap();
  assertNotNull(map);
  assertTrue(map.size() ==  2);
  assertNotNull(map.get(new Text("joe")));
  assertNotNull(map.get(new Text("frank")));



 }


The getMapperContext() method uses most of these stubs (that return empty values) when it returns a context object that can be passed into the mapper.

        /**
  * return a valid mapper context.
  * 
  * @param mapper
  * @param rw
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Mapper.Context getMapperContext(
   Mapper mapper,
   MockRecordWriter rw) throws IOException,
   InterruptedException {
  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID();

  Mapper.Context context = mapper.new Context(
    conf, taskId, new MockRecordReader(), rw,
    new MockOutputCommitter(), new MockStatusReporter(),
    new MockInputSplit());

  return context;

 }

The reducer logic is also testable:


        @Test
 public void testReducerValidInput() throws IOException,
   InterruptedException {

  GroupViewsByUser.GroupViewsByUserReducer reducer = 
                  new GroupViewsByUser.GroupViewsByUserReducer();

  Configuration conf = new Configuration();
  TaskAttemptID taskId = new TaskAttemptID("foo", 1, true, 1, 12);
  MockRecordWriter dwr = new MockRecordWriter();

  Reducer.Context context = getReducerContext(reducer,dwr,conf,taskId); 


  Iterable input = new Iterable() {

   @Override
   public Iterator iterator() {
    List list = new ArrayList();

    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));
    list.add(new Text("http://foobar.com"));

    return list.iterator();
   }

  };

  Text key = new Text("userkey");

  reducer.reduce(key, input, context);

  Map map = dwr.getMap();

  assertNotNull(map);
  assertTrue(map.size() == 1);
  assertNotNull(map.get(new Text("userkey")));
  Text value = map.get(key);
  assertEquals("http://foobar.com:4\t", value.toString());

 }


again, the getReducerContext() returns a mocked up context that is passed into the reducer and evaluated. The MockRecordWriter can be interrogated for results. Here is that getReducerContext():


  * 
  * return a valid reducer context
  * @param reducer
  * @param dwr
  * @param conf
  * @param id
  * @return
  * @throws IOException
  * @throws InterruptedException
  */
 private Reducer.Context getReducerContext(
   Reducer reducer, 
   MockRecordWriter dwr,
   Configuration conf, 
   TaskAttemptID id) throws IOException, 
   InterruptedException {

  MockOutputCommitter doc = new MockOutputCommitter();
  MockStatusReporter dsr = new MockStatusReporter();
  MockRawKeyValueIterator drkv = new MockRawKeyValueIterator();


  Reducer.Context context = reducer.new Context(
  conf, id, drkv, new MockCounter(), new MockCounter(), dwr,
  doc, dsr, null, Text.class, Text.class);

  return context;
 }

In my next post I'll discuss how to take the output of this job and generate item to item mappings. 

Sunday, March 18, 2012

Calculating Conditional Probability via Mapreduce Part 1: the theory

Motivation:

A colleague of mine was bemoaning the lack of non trivial mapreduce examples out there, and wanted to teach a class on how to apply mapreduce to "something other than word count".  This sounded like a great idea to me, so I volunteered to provide a mapreduce implementation of Conditional Probability (CP) calculations from user clickstream data.

CP calculation is one of many collaborative filtering techniques used to deliver personalized product/content recommendations to users at sites like Amazon, Pandora, Netflix, etc. Conditional Probability is the probability of one event happening once another has occurred. Doing this for page views can be used to determine which page views are correlated. Suggesting correlated pages to a user can greatly enhance their browsing and ultimately their purchasing experience.

For a web application, page correlation is possible to determine because user actions map to page views of products or content that are logged to the application servers. From those logs we can determine which pages are more likely to be viewed together by calculating potential correlations between all available pages.  For any large scale website, this means that GB of data per day need to be processed to calculate CP and correlation-- applying map-reduce in this case allows large amounts of log data to be processed in parallel.

Implementing CP  calculations via mapreduce is a great way to start to understand how collaborative filtering is done at scale -- it is initially non intuitive, but ends up being a great way to understand how mapreduce works. Of course, CP calculation is one part of a larger set of steps, one of which must be determining page correlation once CP has been calculated.

For me, it was necessary to understand Conditional Probability in order to correctly implement the CP calculation as well as understand how to use it to determine whether events are correlated.  And that in turn requires a basic understanding of Probability. Note: my interpretation of basic probability concepts is solely mine, and I welcome all comments and corrections :)

Probability Overview:

The probability that an event occurs can be stated as P(A) = (total count of event A)/(total count of all events). For example, if I say there is a 5% probability that I will introduce a bug into our source code, that means that 5 out of every 100 commits I make contain bugs (people on my team would tend to round that number upward, but they only have anecdotal evidence so far:)

At this point it's useful to draw a picture. The Probability Space for a set of potential events is the representation of all possible events that can happen, with their assigned probabilities, expressed as P(Event).



In the Venn Diagram above, the intersection of P(A) and P(B) is shown in purple, and is expressed as  A  B. This is the probability space where both A and B occur, aka the intersection of P(A) and P(B). The complete space covered by A and B is expressed as A ∪ B aka the union of P(A) and P(B).

If two events are mutually exclusive, that means they can never occur together. In the above example, Event C is mutually exclusive with both A and B.

Independent events are never mutually exclusive events. Since independent events by definition may occur together, they cannot be mutually exclusive. For example: I can wear a blue shirt and/or commit a bug. These two events may be independent. They are not mutually exclusive.  I cannot wear a blue shirt and wear no shirt at the same time. Those two events are mutually exclusive.

So events that can co-occur may be independent -- in other words they may not be correlated. If A and B are independent, then the probability that one happens does not depend on the probability that the other.  They may also be dependent -- B may occur more frequently after A occurs. I'll discuss determining independence more after providing a definition of Conditional Probability.

Conditional Probability, Defined and Applied

Conditional Probability, the probability of B occurring when A has occurred, is written as P(B | A).  From the Venn Diagram above, we are explicitly interested in the B    A area -- the intersection of A and B -- but only when A occurs. This can be expressed as

P(B | A) = P(B ∩ A)/P(A)

A Venn Diagram doesn't really show the 'in between state' of A (occurred) and B (not occurred)  too well.  A better way to visualize CP is to use a Probability Tree. In this tree, each branch represents a set of probabilities  for a single event. Events are represented as nodes. Subsequent events are sub branched from the original event nodes. The sum of each set of branches that share a common origin node must be 1 (or 100%).


In the tree above, I'm using some new notation. A' is the complement of A, which means that P(A') is the probability that A will not happen. The complementary probability is accounted for in a probability tree where it is 'implied' in a venn diagram.

This tree shows the possible state map for A and B, in a specific order. If A occurs, there is a probability that B will or will not occur, noted by the conditional probabilities. The same set of possible B states is available if A doesn't occur. The probability tree makes it clear that conditional probabilities represent the state where A (or A') have occurred.

The tree above is incomplete because it doesn't assign actual probabilities to the states. Possible probabilities are shown below.

To get the actual probability of an event, you traverse the tree and multiply the values of branches traversed. This works because of the previous definitions. Remember that B occurring given A is represented as B ∩ A.  From the equation above,

P(B | A) = P(B ∩ A)/P(A)  ->
∩ A = P(A)*P(B | A)

P(A) = 25%, P(B | A) = 33%, P(BA) = 25%*33% =  8.25%

The probability tree above defined an event space with two events that co-occur. One thing it doesn't give us is P(B). But we can get P(B), assuming that A is the only other event in the space, by adding up end states. B can be expressed as P(BA) + P(BA'). If we traverse A->B to get P(BA), we get P(A)*P(B | A). If we traverse A'->B to get P(BA'), we get P(A')*P(B | A'). The sum of these states is the total probability of B happening in this event space:

P(B) = P(A)*P(B | A) + P(A')*P(B | A')
        = 25%*33% +75%*55% = 8.25% +41.25% = 49.5%.

Conditional Probability and Independent Events

All of that is neat (really!), but how do we tie it back to providing real value to our site?  We do this by realizing how CP helps determine whether an event is independent or not. If A and B are truly independent, then

P(B | A) = P(B).

In other words, B is independent from A when B happens regardless of whether A happens. An example of this is two consecutive rolls of a dice. The probability of getting a 1 on the second roll is not affected by the value obtained from the first roll.

Contrast this with two dependent variables. If I had 5 pairs of black socks in a drawer (B), and 5 pairs of white socks (W), and wanted to calculate the probability of getting a black pair P(B) when pulling a pair out of my drawer, it would originally be 50%, but would change because there would be one pair less of black or white socks after my first pull.

If P(B | A) = P(B), and

P(B | A) = P(B ∩ A)/P(A), then for independent events,

P(B) = P(B ∩ A)/P(A) because P(B | A) = P(B) and

P(B)*P(A) = P(B ∩ A)

So if B is independent from A, the probability of B happening after A is the same as the probability of B multiplied by the probability of A.

If P(B ∩ A) >  P(B)*P(A), the two are positively correlated -- there is a greater possibility of B occurring when A occurs than the two occurring independently.

If P(B ∩ A) <  P(B)*P(A), the two are negatively correlated -- there is less of a possibility of B occurring when A occurs than the two occurring independently.


Variable (in)dependence is something to consider when recommending products or content. If we recommend content that we think is correlated when it is (a) independent or (b)negatively correlated, that recommendation may drown out other recommendations that are actually positively correlated, but whose probabilities are much less than the independent or negatively correlated item.

Conclusion (So Far)

As discussed above, calculating CP is part of a larger effort to determine whether pages are correlated, independent, or anti-correlated. To do this for two events A and B it is necessary to know P(B | A), P(B), and P(A). In my next post I'll provide a high level algorithm to get these numbers and determine correlated page views from a generic serve log containing pages viewed by user ID. 





Wednesday, November 2, 2011

Schema On Read? Not so fast!

I just got back from HadoopWorld. I have many thoughts on what I saw and heard there, but that is probably a separate post. I've been trying to write something for the last 3 months, and HadoopWorld gave me the clarity I needed to finish it.

There was this phrase I kept hearing in the halls and meeting rooms...."Schema on Read". Schema on Read, in contrast to Schema on Write, gives you the freedom to define your schema after you've stored your data. Schema on Read sounds like Freedom. And who wouldn't  like Freedom from the Tyranny of Schemas?

If, by the way, that phrase rings a bell, it may be because of this:


All Downfall Meme kidding aside, Schema on Read sure seems nice. Because there is nothing in any of the current NoSQL storage engines that enforces a schema down to the column level, can we just not worry about schemas? What's the point?

Schemas -- good for the consumer, bad for the producer. 

The point is that schemas are a guarantee to a consumer of the data. They guarantee that the data follows a specific format, which makes it easier to consume. That's great for the consumer. They get to write processes that can safely assume that the data has structure.

But...not without a cost, to someone. For the producer of the data,  schemas can suck.  Several reasons:
  1. Because you never get them right the first time, and you're left doing schema versioning and writing upgrade scripts to compensate for your sins of omission -- basically you get nailed for not being omniscient when you designed the schema. 
  2. Because they don't do well with variability. If you have data with a high rate of variability, your schema is guaranteed to change every time you encounter values/types/ranges that you didn't expect. 
The various parties pumping freedom from schemas via NoSQL technologies seem to have an additional implicit message -- that even though you don't have to lock down your data to a schema, you still get the benefits of having one -- the data is still usable. Specifically, if you don't define or partially define the data, you can still get value from it. Because you're storing it. Is that true?

Sure it is. Sort of. Take a file in HDFS. If the file isn't formatted in a specific manner, can it still be processed? As long as you can split it, you can process it. Will the processing code need to account for an undefined range of textual boundary conditions? Absolutely. That code will be guaranteed to break arbitrarily because the format of the data is arbitrary.

The same thing can happen with column families. Any code that processes a schema-free column family needs to be prepared to deal with any kind of column that is added into that column family. Again, the processing code needs to be incredibly flexible to deal with potentially unconstrained change. Document stores are another example where even though the data is parse-able, your processing code may need to achieve sentience a la Skynet in order to process it.

So, yes, you can get value from randomly formatted data, if you can write bulletproof, highly adaptable code. That will eventually take over the world and produce cyborgs with Austrian accents that travel back in time.

But what about those of us that process (semi) structured data? Web logs, for example. Or (XML/JSON) data feeds. Things that have some kind of structure, where the meaning and presence -- aka the semantics -- of fields may change but the separators between them don't. Do we really need freedom from the tyranny of something that guarantees structure when we are processing things that have a basic structure?

Yes. Even though format may be well defined, semantics can be quite variable. Fields may be optional, mileage may vary.  Putting some kind of schematic constraint on all data invalidates one of the key bonuses of big data platforms -- we wouldn't be able to clean it because we wouldn't be able to load it if we had to adhere to some kind of well defined format. In the big data world, imposing a schema early on would not only suck, it would suck at scale.

However, the moment we have done some kind of data cleansing, and have data that is ready for consumption, it makes sense to bind that data to a schema. Note: by schema I'm talking about a definition of the data that is machine readable. JSON keys and values work quite well, regardless of the actual storage engine.

Because the moment you guarantee your data can adhere to a schema, you liberate your data consumers. You give them...freedom! Freedom from....the tyranny of undefined data!

But wait, that's not all...

What else comes along for free? How about a quality bar by which you can judge all data you ingest? If your data goes through a cleansing process, you could publish how much data didn't make it through the cleansing process. Consumers could opt out of data consumption if too much data was lost.

And when your data changes (because it will) your downstream customers can opt out because none of the data would pass validation. This fast failure mode is much preferred to the one in which you discover that your financial reports were wrong after a month because of an undetected format change. That isn't an urban myth, it actually happened to -- ahem -- a friend of mine, who was left to contemplate the phrase that 'pain is a very effective motivator' while scrambling to fix the issue :)

So what does this all mean?
While the costs of Schema on Write are (a) well known and (b) onerous, Schema on Read is not much better the moment you have to maintain processes that consume the data.

However, by leveraging the flexibility of Hadoop, Cassandra, HBase, Mongo, etc, and loading the data in without a schema, I can then rationalize (clean) the data and apply a schema at that point. This provides freedom to the data producer while they are discovering what the data actually looks like, and freedom to the data consumer because they know what to expect. It also lets me change over time in a  controlled manner that my consumers can opt in or out of.

That's not Schema on Read or Schema on Write, it's more like Eventual Schema. And I think it's a rational compromise between the two.