Wednesday, November 2, 2011

Schema On Read? Not so fast!

I just got back from HadoopWorld. I have many thoughts on what I saw and heard there, but that is probably a separate post. I've been trying to write something for the last 3 months, and HadoopWorld gave me the clarity I needed to finish it.

There was this phrase I kept hearing in the halls and meeting rooms...."Schema on Read". Schema on Read, in contrast to Schema on Write, gives you the freedom to define your schema after you've stored your data. Schema on Read sounds like Freedom. And who wouldn't  like Freedom from the Tyranny of Schemas?

If, by the way, that phrase rings a bell, it may be because of this:


All Downfall Meme kidding aside, Schema on Read sure seems nice. Because there is nothing in any of the current NoSQL storage engines that enforces a schema down to the column level, can we just not worry about schemas? What's the point?

Schemas -- good for the consumer, bad for the producer. 

The point is that schemas are a guarantee to a consumer of the data. They guarantee that the data follows a specific format, which makes it easier to consume. That's great for the consumer. They get to write processes that can safely assume that the data has structure.

But...not without a cost, to someone. For the producer of the data,  schemas can suck.  Several reasons:
  1. Because you never get them right the first time, and you're left doing schema versioning and writing upgrade scripts to compensate for your sins of omission -- basically you get nailed for not being omniscient when you designed the schema. 
  2. Because they don't do well with variability. If you have data with a high rate of variability, your schema is guaranteed to change every time you encounter values/types/ranges that you didn't expect. 
The various parties pumping freedom from schemas via NoSQL technologies seem to have an additional implicit message -- that even though you don't have to lock down your data to a schema, you still get the benefits of having one -- the data is still usable. Specifically, if you don't define or partially define the data, you can still get value from it. Because you're storing it. Is that true?

Sure it is. Sort of. Take a file in HDFS. If the file isn't formatted in a specific manner, can it still be processed? As long as you can split it, you can process it. Will the processing code need to account for an undefined range of textual boundary conditions? Absolutely. That code will be guaranteed to break arbitrarily because the format of the data is arbitrary.

The same thing can happen with column families. Any code that processes a schema-free column family needs to be prepared to deal with any kind of column that is added into that column family. Again, the processing code needs to be incredibly flexible to deal with potentially unconstrained change. Document stores are another example where even though the data is parse-able, your processing code may need to achieve sentience a la Skynet in order to process it.

So, yes, you can get value from randomly formatted data, if you can write bulletproof, highly adaptable code. That will eventually take over the world and produce cyborgs with Austrian accents that travel back in time.

But what about those of us that process (semi) structured data? Web logs, for example. Or (XML/JSON) data feeds. Things that have some kind of structure, where the meaning and presence -- aka the semantics -- of fields may change but the separators between them don't. Do we really need freedom from the tyranny of something that guarantees structure when we are processing things that have a basic structure?

Yes. Even though format may be well defined, semantics can be quite variable. Fields may be optional, mileage may vary.  Putting some kind of schematic constraint on all data invalidates one of the key bonuses of big data platforms -- we wouldn't be able to clean it because we wouldn't be able to load it if we had to adhere to some kind of well defined format. In the big data world, imposing a schema early on would not only suck, it would suck at scale.

However, the moment we have done some kind of data cleansing, and have data that is ready for consumption, it makes sense to bind that data to a schema. Note: by schema I'm talking about a definition of the data that is machine readable. JSON keys and values work quite well, regardless of the actual storage engine.

Because the moment you guarantee your data can adhere to a schema, you liberate your data consumers. You give them...freedom! Freedom from....the tyranny of undefined data!

But wait, that's not all...

What else comes along for free? How about a quality bar by which you can judge all data you ingest? If your data goes through a cleansing process, you could publish how much data didn't make it through the cleansing process. Consumers could opt out of data consumption if too much data was lost.

And when your data changes (because it will) your downstream customers can opt out because none of the data would pass validation. This fast failure mode is much preferred to the one in which you discover that your financial reports were wrong after a month because of an undetected format change. That isn't an urban myth, it actually happened to -- ahem -- a friend of mine, who was left to contemplate the phrase that 'pain is a very effective motivator' while scrambling to fix the issue :)

So what does this all mean?
While the costs of Schema on Write are (a) well known and (b) onerous, Schema on Read is not much better the moment you have to maintain processes that consume the data.

However, by leveraging the flexibility of Hadoop, Cassandra, HBase, Mongo, etc, and loading the data in without a schema, I can then rationalize (clean) the data and apply a schema at that point. This provides freedom to the data producer while they are discovering what the data actually looks like, and freedom to the data consumer because they know what to expect. It also lets me change over time in a  controlled manner that my consumers can opt in or out of.

That's not Schema on Read or Schema on Write, it's more like Eventual Schema. And I think it's a rational compromise between the two.

Monday, April 18, 2011

Rolling out splittable lzo on CDH3

Until splittable lzo, compression options in HDFS were limited. Gzip generated unsplittable output -- great for reducing allocated block usage, terrible for mapreduce efficiency. Bz2 generated splittable output, but took far too long to be effectively used in production.

When we wanted to start incorporating compression into our storage procedures, splittable lzo was the only rational option to ensure parallel processing of compressed files.

We had tried to use bz2 compression on files prior to ingestion, but it took much longer --  approximately 20x as long as gzip compression on the same file. 


For a 1GB text file, 
  • gzip -1 took ~ 25 seconds (actually, this is strange. I was expecting gzip to be slightly faster than lzo)
  • lzo -1 took ~ 9 seconds, indexing took another 4.
  • bzip2 -1 took  ~ 3 minutes. 

I set the max speed of each compression routine to provide a relative benchmark: in reality we would be running at a slower speed that increased compression. 


Installing The Bits


The java and native source for splittable lzo can be found at https://github.com/kevinweil/hadoop-lzo. If you're using the Cloudera distro, you should use the https://github.com/toddlipcon/hadoop-lzo fork.

The cluster I was installing splittable lzo on was running Centos and walled off from the rest of the world. I found it easiest to generate RPMs on a box with the same architecture, then install those RPMs on all nodes in the cluster. I did this using the https://github.com/toddlipcon/hadoop-lzo-packager code, which takes the native and java components and installs them to the right locations. Note that since I was building on a Centos box, I ran


./run.sh --no-deb

to build RPMs only. There were two rpms, the standard one and the debug-info one. The naming convention appears to be YYYYmmDDHHMMSS.full.version.git_hash_of_hadoop_lzo_project.arch, to allow you to upgrade when either the packaging code or the original hadoop lzo code changes.

The RPMs installed the following java and native bits (note that the packager timestamps the jars):

rpm -ql cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b-1.x86_64


/usr/lib/hadoop-0.20/lib/cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b.jar
/usr/lib/hadoop-0.20/lib/native
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.a
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.la
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.0.0

rpm -ql cloudera-hadoop-lzo-debuginfo-20110414162014.0.4.10.0.g2bd0d5b-1.x86_64

/usr/lib/debug
/usr/lib/debug/usr
/usr/lib/debug/usr/lib
/usr/lib/debug/usr/lib/hadoop-0.20
/usr/lib/debug/usr/lib/hadoop-0.20/lib
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.0.0.debug
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.debug
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.debug

Hadoop Configuration Changes

After installing the bits via RPMs, There were a couple of changes necessary to get Hadoop to recognize the new codec.

In core-site.xml:

<property>
  <name>io.compression.codecs</name>
  <value>
    org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,
    com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,
    org.apache.hadoop.io.compress.BZip2Codec
  </value>
 </property>
 <property>
   <name>io.compression.codec.lzo.class
   <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

registers the codec in the codec factory.
In mapred-site.xml:

<property>
   <name>mapred.compress.map.output</name>
   <value>true</value>
 </property>
 <property>
   <name>mapred.map.output.compression.codec</name>
   <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

sets intermediate output to be lzo compressed. After pushing configs out to all nodes in the cluster, I restarted the cluster. The next step was to verify that lzo was installed correctly.

Validation

There were some hiccups I ran into during validation -- all pilot error, but I wanted to put them all in one place for next time. My validation steps looked like this:

(1) create an lzo file that was greater than my block size.
(2) upload and index it.
(3) run a mapreduce using the default IdentityMapper
(4) verify that multiple mappers were run from the one lzo file.
(5) verify that the output was the same size and format as the input.

My first mistake: I lzo compressed a set of files. The splittable lzo code only works with a single file. This took me a while to figure out -- mostly due to tired brain. After I had catted the files together into a single file, then lzo'd that file, I was able to upload it to HDFS and index it:

hadoop jar /usr/lib/hadoop/lib/cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b.jar com.hadoop.compression.lzo.LzoIndexer /tmp/out.lzo

This created an index file. From this great article on the Cloudera site: "Once the index file has been created, any LZO-based input format can split compressed data by first loading the index, and then nudging the default input splits forward to the next block boundaries."

Since I had an uploaded, indexed file at this point, I moved to step 3 and 4. Before I could make the IdentityMapper, I needed to get the LZO bits on my mac so that the IdentityMapper could run.


Detour: Getting the Bits on my Mac

I dev on a Mac, but run the cluster on Centos (I can already feel the wrath of Ted Dziuba coming down from on high). I found the instructions here to be adequate to get the changes I needed to make to the IdentityMapper code to compile. 

Back to Validation

I ran an IdentityMapper on the original source (side note: in 0.20, to run IdentityMapper, just don't specify a mapper, the default Mapper class implements pass through mapping). I watched the cluster to make sure that the original file was split out across mappers. It wasnt. I was stumped -- I knew this was something simple, but couldn't see what it was.

After a gentle reminder from Cloudera Support (one of many in the last couple of days, actually:), I set my input format class to LzoTextInputFormat, which -- as the same article above mentions in the next sentence -- "splits compressed data by first loading the index, and then nudges the default input splits forward to the next block boundaries. With these nudged splits, each mapper gets an input split that is aligned to block boundaries, meaning it can more or less just wrap its InputStream in an LzopInputStream and be done." When I had used the default TextInputFormat, the mapreduce was working, but the input was being compressed and not split.

job.setInputFormatClass(LzoTextInputFormat.class);

Once I had observed splitting behavior from my indexed lzo file by confirming multiple map tasks, I made sure that output was recompressed as lzo by setting FileOutputFormat properties:

FileOutputFormat.setCompressOutput(job, true); 
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class) ;

This is different from instructions in Hadoop: The Definitive Guide, and I found it after some googling around. The instructions in the book -- setting properties in the Configuration objct -- did not work -- most likely because the book was written for an earlier version of Hadoop.

Once I had added those lines to my Tool subclass, I was able to get compressed output that matched my compressed input: the exact result  I was looking for when validating using the IdentityMapper.

Monday, April 11, 2011

HDFS file size vs allocation

Recently, I had to understand HDFS at a deeper level that had nothing to do with running mapreduce jobs or writing to the FileSystem API. Specifically, I had to understand the way that HDFS interacts with the underlying filesystem, and the difference between actual HDFS file size and the way HDFS calculates available storage when using quotas.

We recently discovered a bunch of files that were much smaller than our allocated block size -- on average they took up roughly 1/10th of an allocated block.

This was not the standard small file problem, where the namenode requires too much memory to track metadata for large (10s of millions) numbers of files at 150 bytes of metadata per file.

My immediate conclusion was that these small files were effectively taking up a block at a time, and that we were running out of space -- fast! --  because that was the behavior I thought I was seeing at the HDFS level -- I thought that  storage was allocated a block at a time, and quotas were determined based on available blocks.

That last statement is partially correct. Storage is allocated a block -- actually a block * replication factor -- at a time. However quotas are determined based on available bytes. A space quota, according to the docs is "a hard limit on the number of bytes used by files in the tree rooted at that directory. Block allocations fail if the quota would not allow a full block to be written."

This is what that means: the only time files are measured in blocks is at block allocation time. The rest of the time, files are measured in bytes. The space quota is calculated against the number of bytes, not blocks, left in the cluster. That number of bytes is converted to the number of blocks (not bytes) that would be required to store a file when a user tries to upload a file. The key here is that space is calculated in blocks at allocation time, so no matter how small a file is, you will always need 1 block * replication factor available to put it in the cluster.

HDFS Operational Details

I spent some time asking, researching, and re-reading the book, and found that making analogies from a standard filesystem to understand HDFS helped me immensely -- to a point (more on that later).

In a standard filesystem, an inode contains file metadata, like permissions, ownership, last time changed, etc, in addition to a set of pointers that point to all blocks that comprise the file. Inodes are kept in a specific location in the filesystem are used to access files. 

The inode and block equivalents in HDFS are distributed across the namenode and the datanode.

The namenode maintains file system metadata, which is analogous to the inodes in a standard FS. This metadata is stored in {dfs.name.dir}/current. Datanodes contain blocks of data, stored as block files in the underlying filesystem.

On the datanode, HDFS stores block data in files in the directory specified by dfs.data.dir, which defaults to {hadoop.tmp.dir}/dfs/data/current. HDFS may create subdirectories underneath that dir to balance out files across directories (many filesystems have a file-per-directory limit). The raw data per block is kept in two files, a blk_NNNN file, and a corresponding blk_NNNN_XXXX.meta file, which contains the block checksum, used in block integrity checks. 

The block file and checksum file information is periodically sent to the namenode as a blockreport -- i.e. at HDFS startup (HDFS enters safemode while the namenode processes block reports from it's consituent datanodes). Note that each datanode has no idea which block files map to which actual files. It just tracks the blocks. This makes the namenode very critical to HDFS functionality.

To summarize: the metadata that inodes maintain in a standard FS is maintained in the HDFS namenode, and actual file data that is maintained in filesystem blocks in a standard FS is maintained in HDFS blocks on datanodes, which store that block data in block files, maintain checksums of the block data for integrity checking, and update the namenode with information about the blocks they manage.

FileSystem Analogies That Do and Don't Work

In a standard filesystem, disks have a minimum amount of data that they can read or write to, this is called a disk block. Unix disk blocks are 512 bytes.  FileSystems also have minimum read/write filesystem blocks that are typically 1-2kb.

Files on a standard filesystem are typically much larger than a block in size. Since most files are not exactly X blocks in size, the 'remainder' of the file that does not fill up a block still takes up that much space on the system. In general (ReiserFS being one exception) the difference between the files real size and it's block size  -- the slack -- cannot actually be used for any other file.

In HDFS, if a file is smaller than a block in size, it does not take up an entire HDFS block on disk. There is no concept of HDFS block 'slack space'. A small file takes up as many bytes as it would in a normal filesystem because it is stored as a block file in the normal filesystem. This is where the definition of HDFS block differs from a traditional file block, and this is where my mental model of HDFS as a filesystem failed me :)

While the file and block analogy is valid in HDFS, the size of the blocks makes the difference between file allocated size (always represented in blocks) and file actual size (always in bytes) much larger than it would be on a traditional file system. So you can't treat allocated vs actual size as equivalents, like you effectively can on a traditional filesystem where the block size to file size ratio is relatively tiny. 

Small Files on the Datanodes

At allocation time, a  small file will require a single block file per datanode. Note that the actual number of blocks required to store that file on the cluster depends on HDFS replication policy, which defaults to 3. So factoring in replication, a small (less than 1 block) file is replicated at three identical block files on separate nodes.

That block file is the same size as the small file -- large files would span several blocks and be split into block size files -- a large file that was 350MB big on a system with 128MB block size would be split into 3 blocks, the first two of 128MB, the last one of 94MB. Each of those would be replicated according to the replication policy of the cluster. The only files that don't take up space on the datanodes are zero byte files, which still take up space on the namenode.

Regardless of actual size, at allocation time, HDFS treats a small file as having a minimum size of one HDFS block per datanode when it is calculating available disk space.  So, even if a file is really small, if there is less than a three blocks available on the cluster, the file cannot be stored on the system.

Space Quotas
HDFS only has less than the number of replicated blocks left than it needs to store a file when it is either running out of space, or, more commonly, if there is a space quota on the directory that the file is being copied to. Calculating storage cost in blocks allows HDFS safely store data to a known maximum size, no matter what the actual size of the file is. HDFS will only permit new block creation if there is enough disk space to create a block on N datanodes, where N is the replication factor.

This article shows how HDFS block size, combined with the replication factor, not file actual size, determines available space.

Conclusion

Is this really a problem? Sort of...it's a matter of efficiency. Space quotas are checked by the amount of remaining space on a datanode disk. If a block file takes up 12MB on a system that has 128MB block, there are effectively 114MB available to be added into the available bytes for the space quota -- for a replication factor of 3, that would be 342MB available, or 2.67 blocks. While you could argue that effectively .67 blocks of that space is wasted, 2 blocks of that space is still available for quota calculations. While 2.67 blocks is less than the minimum amount of space required to store a file of _any_ size in an HDFS with a replication factor of 3, if you were to have 2 small files of 12MB, you have 5.34 blocks available across the system -- effectively if you always mod 'leftover space' by replication factor, at most you are wasting replication factor # of blocks.

Granted that's not the most efficient use of disk, but it's not as if a small file takes up a 'virtual' block that gets factored in the next time a file is copied into the cluster.

The bigger problem with small files is the lack of efficiency that is encountered in mapreduce operations. Reducing the number of mappers being used and traversing blocks of data at a time is not possible with small files -- one mapper is spun up per file, and the overhead involved in copying the jar file to the task tracker node, starting up the JVM, etc, only makes sense if there is a substantial amount of data to process. You can't go wrong with large files -- they will split across blocks, which are processed more efficiently.

Monday, March 14, 2011

Setting up YCSB for low latency data store testing

Overview 
When confronted with a problem, my first instinct is to look around to see where that problem has been handled before. This is because  I believe that code is a liability, and I want to minimize risk by using code that has been vetted, tested, and put into production by others, and only add to it when necessary.

Right now I have several problems around storing and accessing lots of data in real time. I have several diverse use cases that span applications, but the one thing all of these use cases have in common is that there is no need for transactional integrity. There is also a need for scale beyond which a traditional RDBMS can provide.  The first (un) requirement and the second very urgent requirement are pushing me towards (open source) low-latency, NOSQL data stores.

The two kinds of NOSQL data stores I'm looking at are Document Stores and Key-Value stores. Here is a great post discussing the differences between the two.

Some of the questions I need answered to address projects in progress and planned:
  1. What Document Stores and Key-Value Stores out there have heavy adoption rates, a corporate sponsor, other community support that indicate good performance and support? 
  2. How much 'eventual consistency' can an application live with? If data doesn't need to be transactional, can it really be eventually consistent? 
  3. Is there a Document Store that is fast enough to act as a Key-Value store, since it would be easier to manage one piece of software over two. 
  4. How do different Key Value stores compare to one another? Anecdotal evidence is one thing, hard data that I can refer to makes me feel much better.
  5. What happens when I shut a node down? How hard is it to restore?
  6. What are the costs of maintenance of different stores? How hard are they to set up?
I wanted to have question 1and 2 narrow down the range somewhat,  evaluate that range against 3 and 4 to filter out slower candidates, leaving me with a smaller set to run by questions 5 and 6.

In order to answer 3 and 4 above, I need to compare and contrast both Doc and KV stores in an 'apples to apples' way to gauge performance.

I was psyching myself up to write a generic test framework, when someone pointed me to Brian Cooper  and YCSB, the Yahoo Cloud Serving Benchmark.  I had originally dismissed it as being out of date, but a quick perusal of the code on GitHub convinced me that updating it would not be that hard because it cleanly separates specific database calls from core functionality.

YCSB implements different database client abstraction layers, and provides good documentation on how to set them up: https://github.com/brianfrankcooper/YCSB/wiki/getting-started.

Not Quite Ready For Prime Time

Before I could fully use YCSB, I had to fix up a couple of things. There are patches submitted for some of these fixes in the root project, but they hadn't been accepted yet. It made more sense for me to fork a repo and make the changes I needed (and push them if they hadn't already been pushed up to the upstream origin repo): https://github.com/arunxarun/YCSB

Here are some of the fixes I added, they havent been integrated into the master repo yet:

  1. The Cassandra7Client needed to be retrofitted to use ByteBuffers instead of byte[]s.
  2. The MongoDbClient was throwing a ClassCastException in the insert() method because it was casting a double encoded in a string to an Integer. 
  3. The MongoDbClient was not connecting to non localhost MongoDB instances because it wasn't appending the database name to the base database url.
  4. There was no truncate functionality. For a Document Store like Mongo, this meant I had to manually truncate the db every time I wanted to reload data. I implemented the truncate method in the DB abstract class (and pushed it to the adaptor classes I used) so that I could do this via YCSB.
I'm going to continue adding functionality -- right now I'm in the middle of adding delete functionality because we want to benchmark that as well --  to my fork and pushing it up if I think it could be useful for other people.

Running A WorkLoad

In YCSB terminology, a workload is a defined set of operations on a database. Workloads are stored as flat files, and executed by specific classes that extend the abstract Workload class. I'm using the CoreWorkload class (the default) for now, and may extend later. CoreWorkload lets me set the proportion of Reads vs Writes vs Updates vs Deletes in a separate property file. There are default core workload files stored in the $YCSB_HOME/workloads directory. They break out like this:
  • workloada = 50/50 read/update ratio
  • workloadb = 95/5 read/update ratio
  • workloadc = 100/0 read/update ratio
  • workloadd = 95/5 read/update ratio
  • workloade = 95/5 scan/insert ratio
  • workloadf = 50/50 read/read-modify-write ratio
Because they are property files, workload files can be copied/tweaked as needed.  If needed, I can also override the CoreWorkLoad class to do something different, but I haven't had to do that yet, even though I've added new functionality.

I followed the section on Running a Workload, below are my notes in addition to those instructions. 

Building YCSB
Pretty self explanatory: there is an ant target for each db client you wish to compile with:

    ant dbcompile-[DB Client Name]

Just make sure your all of the jars your DB Client class needs are in the $YCSB_HOME/db/[Client DB]/lib directory. Note that sometimes, like in the case of Mongo, you may have to find those jars (slf4j, log4j) in other places.

Data Store Setup
There are some a generic setup steps for all Data Stores:
  1. Create a [namespace/schema-like-element] called 'userspace'. For example, in Cassandra this would be a keyspace. In Mongo, a database, etc. 
  2. Create a [table-like element] in 'userspace', called 'data'. Again, in Cassandra this would be a column family, in Mongo, a collection, in HBase a column family, in MySQL a table. 
DB Specific details are found on the usage page.

Running YCSB

When running YCSB, make sure you specify the jar files used for the DB Client. The first command you will run is the load command: 

java -cp $YCSB_INSTALL/db/[DB Client dir]/lib/*:$YCSB_INSTALL/build/ycsb.jar com.yahoo.ycsb.CommandLine -db [DB Client class] 


Once you are on the commandline make sure you can connect and see the namespace/keyspace you've created. With that sanity check done, it's time to run a workload. In order to do this I also need to load some data. This is done using the command line client from ycsb.jar:

java -cp $YCSB_INSTALL/db/[DB Client dir]/lib/*:$YCSB_INSTALL/build/ycsb.jar   com.yahoo.ycsb.Client -db [DB Client class]  -p [commandline props] -P [property files] -s -load  > out

Some explanation of the available commandline parameters, note that in the above I'm running with one thread, no target ops, and loading the database via the -load parameter.
  • -threads n: execute using n threads (default: 1) - can also be specified as the "threadcount" property using -p
  • -target n: attempt to do n operations per second (default: unlimited) - can also be specified as the "target" property using -p
  • -load: run the loading phase of the workload
  • -t: run the transactions phase of the workload (default)
  • -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - can also be specified as the "db" property using -p
  • -P propertyfile: load properties from the given file. Multiple files can be specified, and will be processed in the order specified
  • -p name=value: specify a property to be passed to the DB and workloads; multiple properties can be specified, and override any values in the propertyfile
  • -s: show status during run (default: no status)
  • -l label: use label for status (e.g. to label one experiment out of a whole batch)
  • -truncate, my own special addition, to clean out data stores between runs. 

Some notes on the properties files I'm loading: the first one specifies the actual workload configuration. I'm using workloads/workloada, which looks like this:

recordcount=100000
operationcount=100000
workload=com.yahoo.ycsb.workloads.CoreWorkload

readallfields=true

readproportion=0.5
updateproportion=0.5
scanproportion=0
insertproportion=0

requestdistribution=zipfian

When I specify later properties files, they override the values set in the previous ones (commandline props override everything). I take advantage of this by creating other property files that override recordcount, insertioncount, and set db specific properties that are accessed in the DB Client classes.

The output of the run is the average, min, max, 95th and 99th percentile latency for each operation type (read, update, etc.), a count of the return codes for each operation, and a histogram of latencies for each operation.

The histogram shows the number of calls that were returned within the specified number of milliseconds. For example:
0 45553
1 2344
2 399
3 25
4 5
5 0
reads like this:
  • 45553 calls returned in 0 ms
  • 2344 calls returned in 1ms
  • 399 calls returned in 2 ms
  • ....
That's All For Now...


YCSB as it is provides a very solid foundation for me to do testing across candidate data stores.  While it is very well documented, the code could use some love. I intend to give it enough love to evaluate the data stores I want to (in my fork), and push that love upstream. In the future I may end up writing a DB Client for some of the commercial stores we need to evaluate, as well as fix things that bug me.

Wednesday, January 5, 2011

Setting up CDH3 Hadoop on my new Macbook Pro

A New Machine 
I'm fortunate enough to have recently received a Macbook Pro, 2.8 GHz Intel dual core, with 8GB RAM.  This is the third time I've turned a vanilla mac into a ninja coding machine, and following my design principle of "first time = coincidence, second time = annoying, third time = pattern", I've decided to write down the details for the next time.

Baseline
This section details the pre-hadoop installs I did.

Java
Previously I was running on Leopard, i.e. 10.4, and had to install soylatte to get the latest version of Java. In Snow Leopard, java jdk 1.6.0_22 is installed by default. That's good enough for me, for now.

Gcc, etc.
In order to get these on the box, I had to install XCode, making sure to check the 'linux dev tools' option.

MacPorts
I installed MacPorts in case I needed to upgrade any native libs or tools.

Eclipse
I downloaded the 64 bit Java EE version of Helios.

Tomcat
Tomcat is part of my daily fun, and these instructions to install tomcat6 where helpful. One thing to note is that in order to access the tomcat manager panel, you also need to specify

<role rolename="manager"/>

prior to defining

<user username="admin" password="password" roles="standard,manager,admin"/>

Also, I run tomcat standalone (no httpd), so the mod_jk install part didnt apply. Finally, I chose not to daemonize tomcat because this is a dev box, not a server, and the instructions for compiling and using jsvc for 64 bit sounded iffy at best.

Hadoop
I use the CDH distro. The install was amazingly easy, and their support rocks. Unfortunately, they don't have a dmg that drops Hadoop on the box configured and ready to run, so I need to build up my own psuedo mac node. This is what I want my mac to have (for starters):
  1. distinct processes for namenode, job tracker node, and datanode/task tracker nodes.
  2. formatted HDFS
  3. Pig 0.8.0
I'm not going to try to auto start hadoop because (again) this is a dev box, and start-all.sh should handle bringing up the JVMs around namenode, job tracker, datanode/tasktracker.

I am installing CDH3, because I've been running it in psuedo-mode on my Ubuntu dev box for the last month and have had no issues with it. Also, I want to run Pig 0.8.0, and that version may have some assumptions about the version of Hadoop that it needs.

All of the CDH3 Tarballs can be found at http://archive.cloudera.com/cdh/3/, and damn, that's a lot of tarballs.

I downloaded hadoop 0.20.2+737, it's (currently) the latest version out there. Because this is my new dev box, I decided to forego the usual security motivated setup of the hadoop user. When this decision comes back to bite me, I'll be sure to update this post. In fact, for ease of permissions/etc, I decided to install under my home dir, under  a CDH3 dir, so I could group all CDH3 related installs together. I symlinked the hadoop-0.20+737 dir to hadoop, and I'll update it if CDH3 updates their version of hadoop.

After untarring to the directory, all that was left was to make sure the ~/CDH3/hadoop/bin directory was in my .profile PATH settings.

Psuedo Mode Config
I'm going to set up Hadoop in psuedo distributed mode, just like I have on my Ubuntu box. Unlike Debian/Red Hat CDH distros, where this is an apt-get or yum command, I need to set up conf files on my own.

Fortunately the example-confs subdir of the Hadoop install has a conf.psuedo subdir. I needed to modify the following in core-site.xml:

 <property>
     <name>hadoop.tmp.dir</name>
     <value>changed_to_a_valid_dir_I_own</value>
 </property>

and the following in hdfs-site.xml:

 <property>
     <!-- specify this so that running 'hadoop namenode -format' formats the right dir -->
     <name>dfs.name.dir</name>
     <value>changed_to_a_different_dir_I_own</value>
  </property>

I also had to create masters and slaves files in the example-confs/conf.pseudo directory:

echo localhost > master
echo localhost > slave

finally, I symlinked the conf dir at the top level of the Hadoop install to example-configs/conf.pseudo after saving off the original conf:

mv ./conf install-conf
ln -sf ./example-confs/conf.pseudo conf

Pig
Installing Pig is as simple as downloading the tar, setting the path up, and going, sort of. The first time I ran pig, it tried to connect to the default install location of hadoop, /usr/lib/hadoop-0.20/. I made sure to set HADOOP_HOME to point to my install, and verified that the grunt shell connected to my configured HDFS (on port 8020).

More To Come
This psuedo node install was relatively painless. I'm going to continue to install Hadoop/HDFS based tools that may need more (HBase) or less (Hive) configuration, and update in successive posts.