Monday, April 18, 2011

Rolling out splittable lzo on CDH3

Until splittable lzo, compression options in HDFS were limited. Gzip generated unsplittable output -- great for reducing allocated block usage, terrible for mapreduce efficiency. Bz2 generated splittable output, but took far too long to be effectively used in production.

When we wanted to start incorporating compression into our storage procedures, splittable lzo was the only rational option to ensure parallel processing of compressed files.

We had tried to use bz2 compression on files prior to ingestion, but it took much longer --  approximately 20x as long as gzip compression on the same file. 


For a 1GB text file, 
  • gzip -1 took ~ 25 seconds (actually, this is strange. I was expecting gzip to be slightly faster than lzo)
  • lzo -1 took ~ 9 seconds, indexing took another 4.
  • bzip2 -1 took  ~ 3 minutes. 

I set the max speed of each compression routine to provide a relative benchmark: in reality we would be running at a slower speed that increased compression. 


Installing The Bits


The java and native source for splittable lzo can be found at https://github.com/kevinweil/hadoop-lzo. If you're using the Cloudera distro, you should use the https://github.com/toddlipcon/hadoop-lzo fork.

The cluster I was installing splittable lzo on was running Centos and walled off from the rest of the world. I found it easiest to generate RPMs on a box with the same architecture, then install those RPMs on all nodes in the cluster. I did this using the https://github.com/toddlipcon/hadoop-lzo-packager code, which takes the native and java components and installs them to the right locations. Note that since I was building on a Centos box, I ran


./run.sh --no-deb

to build RPMs only. There were two rpms, the standard one and the debug-info one. The naming convention appears to be YYYYmmDDHHMMSS.full.version.git_hash_of_hadoop_lzo_project.arch, to allow you to upgrade when either the packaging code or the original hadoop lzo code changes.

The RPMs installed the following java and native bits (note that the packager timestamps the jars):

rpm -ql cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b-1.x86_64


/usr/lib/hadoop-0.20/lib/cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b.jar
/usr/lib/hadoop-0.20/lib/native
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.a
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.la
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0
/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.0.0

rpm -ql cloudera-hadoop-lzo-debuginfo-20110414162014.0.4.10.0.g2bd0d5b-1.x86_64

/usr/lib/debug
/usr/lib/debug/usr
/usr/lib/debug/usr/lib
/usr/lib/debug/usr/lib/hadoop-0.20
/usr/lib/debug/usr/lib/hadoop-0.20/lib
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.0.0.debug
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.0.debug
/usr/lib/debug/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64/libgplcompression.so.debug

Hadoop Configuration Changes

After installing the bits via RPMs, There were a couple of changes necessary to get Hadoop to recognize the new codec.

In core-site.xml:

<property>
  <name>io.compression.codecs</name>
  <value>
    org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,
    com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,
    org.apache.hadoop.io.compress.BZip2Codec
  </value>
 </property>
 <property>
   <name>io.compression.codec.lzo.class
   <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

registers the codec in the codec factory.
In mapred-site.xml:

<property>
   <name>mapred.compress.map.output</name>
   <value>true</value>
 </property>
 <property>
   <name>mapred.map.output.compression.codec</name>
   <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

sets intermediate output to be lzo compressed. After pushing configs out to all nodes in the cluster, I restarted the cluster. The next step was to verify that lzo was installed correctly.

Validation

There were some hiccups I ran into during validation -- all pilot error, but I wanted to put them all in one place for next time. My validation steps looked like this:

(1) create an lzo file that was greater than my block size.
(2) upload and index it.
(3) run a mapreduce using the default IdentityMapper
(4) verify that multiple mappers were run from the one lzo file.
(5) verify that the output was the same size and format as the input.

My first mistake: I lzo compressed a set of files. The splittable lzo code only works with a single file. This took me a while to figure out -- mostly due to tired brain. After I had catted the files together into a single file, then lzo'd that file, I was able to upload it to HDFS and index it:

hadoop jar /usr/lib/hadoop/lib/cloudera-hadoop-lzo-20110414162014.0.4.10.0.g2bd0d5b.jar com.hadoop.compression.lzo.LzoIndexer /tmp/out.lzo

This created an index file. From this great article on the Cloudera site: "Once the index file has been created, any LZO-based input format can split compressed data by first loading the index, and then nudging the default input splits forward to the next block boundaries."

Since I had an uploaded, indexed file at this point, I moved to step 3 and 4. Before I could make the IdentityMapper, I needed to get the LZO bits on my mac so that the IdentityMapper could run.


Detour: Getting the Bits on my Mac

I dev on a Mac, but run the cluster on Centos (I can already feel the wrath of Ted Dziuba coming down from on high). I found the instructions here to be adequate to get the changes I needed to make to the IdentityMapper code to compile. 

Back to Validation

I ran an IdentityMapper on the original source (side note: in 0.20, to run IdentityMapper, just don't specify a mapper, the default Mapper class implements pass through mapping). I watched the cluster to make sure that the original file was split out across mappers. It wasnt. I was stumped -- I knew this was something simple, but couldn't see what it was.

After a gentle reminder from Cloudera Support (one of many in the last couple of days, actually:), I set my input format class to LzoTextInputFormat, which -- as the same article above mentions in the next sentence -- "splits compressed data by first loading the index, and then nudges the default input splits forward to the next block boundaries. With these nudged splits, each mapper gets an input split that is aligned to block boundaries, meaning it can more or less just wrap its InputStream in an LzopInputStream and be done." When I had used the default TextInputFormat, the mapreduce was working, but the input was being compressed and not split.

job.setInputFormatClass(LzoTextInputFormat.class);

Once I had observed splitting behavior from my indexed lzo file by confirming multiple map tasks, I made sure that output was recompressed as lzo by setting FileOutputFormat properties:

FileOutputFormat.setCompressOutput(job, true); 
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class) ;

This is different from instructions in Hadoop: The Definitive Guide, and I found it after some googling around. The instructions in the book -- setting properties in the Configuration objct -- did not work -- most likely because the book was written for an earlier version of Hadoop.

Once I had added those lines to my Tool subclass, I was able to get compressed output that matched my compressed input: the exact result  I was looking for when validating using the IdentityMapper.

Monday, April 11, 2011

HDFS file size vs allocation

Recently, I had to understand HDFS at a deeper level that had nothing to do with running mapreduce jobs or writing to the FileSystem API. Specifically, I had to understand the way that HDFS interacts with the underlying filesystem, and the difference between actual HDFS file size and the way HDFS calculates available storage when using quotas.

We recently discovered a bunch of files that were much smaller than our allocated block size -- on average they took up roughly 1/10th of an allocated block.

This was not the standard small file problem, where the namenode requires too much memory to track metadata for large (10s of millions) numbers of files at 150 bytes of metadata per file.

My immediate conclusion was that these small files were effectively taking up a block at a time, and that we were running out of space -- fast! --  because that was the behavior I thought I was seeing at the HDFS level -- I thought that  storage was allocated a block at a time, and quotas were determined based on available blocks.

That last statement is partially correct. Storage is allocated a block -- actually a block * replication factor -- at a time. However quotas are determined based on available bytes. A space quota, according to the docs is "a hard limit on the number of bytes used by files in the tree rooted at that directory. Block allocations fail if the quota would not allow a full block to be written."

This is what that means: the only time files are measured in blocks is at block allocation time. The rest of the time, files are measured in bytes. The space quota is calculated against the number of bytes, not blocks, left in the cluster. That number of bytes is converted to the number of blocks (not bytes) that would be required to store a file when a user tries to upload a file. The key here is that space is calculated in blocks at allocation time, so no matter how small a file is, you will always need 1 block * replication factor available to put it in the cluster.

HDFS Operational Details

I spent some time asking, researching, and re-reading the book, and found that making analogies from a standard filesystem to understand HDFS helped me immensely -- to a point (more on that later).

In a standard filesystem, an inode contains file metadata, like permissions, ownership, last time changed, etc, in addition to a set of pointers that point to all blocks that comprise the file. Inodes are kept in a specific location in the filesystem are used to access files. 

The inode and block equivalents in HDFS are distributed across the namenode and the datanode.

The namenode maintains file system metadata, which is analogous to the inodes in a standard FS. This metadata is stored in {dfs.name.dir}/current. Datanodes contain blocks of data, stored as block files in the underlying filesystem.

On the datanode, HDFS stores block data in files in the directory specified by dfs.data.dir, which defaults to {hadoop.tmp.dir}/dfs/data/current. HDFS may create subdirectories underneath that dir to balance out files across directories (many filesystems have a file-per-directory limit). The raw data per block is kept in two files, a blk_NNNN file, and a corresponding blk_NNNN_XXXX.meta file, which contains the block checksum, used in block integrity checks. 

The block file and checksum file information is periodically sent to the namenode as a blockreport -- i.e. at HDFS startup (HDFS enters safemode while the namenode processes block reports from it's consituent datanodes). Note that each datanode has no idea which block files map to which actual files. It just tracks the blocks. This makes the namenode very critical to HDFS functionality.

To summarize: the metadata that inodes maintain in a standard FS is maintained in the HDFS namenode, and actual file data that is maintained in filesystem blocks in a standard FS is maintained in HDFS blocks on datanodes, which store that block data in block files, maintain checksums of the block data for integrity checking, and update the namenode with information about the blocks they manage.

FileSystem Analogies That Do and Don't Work

In a standard filesystem, disks have a minimum amount of data that they can read or write to, this is called a disk block. Unix disk blocks are 512 bytes.  FileSystems also have minimum read/write filesystem blocks that are typically 1-2kb.

Files on a standard filesystem are typically much larger than a block in size. Since most files are not exactly X blocks in size, the 'remainder' of the file that does not fill up a block still takes up that much space on the system. In general (ReiserFS being one exception) the difference between the files real size and it's block size  -- the slack -- cannot actually be used for any other file.

In HDFS, if a file is smaller than a block in size, it does not take up an entire HDFS block on disk. There is no concept of HDFS block 'slack space'. A small file takes up as many bytes as it would in a normal filesystem because it is stored as a block file in the normal filesystem. This is where the definition of HDFS block differs from a traditional file block, and this is where my mental model of HDFS as a filesystem failed me :)

While the file and block analogy is valid in HDFS, the size of the blocks makes the difference between file allocated size (always represented in blocks) and file actual size (always in bytes) much larger than it would be on a traditional file system. So you can't treat allocated vs actual size as equivalents, like you effectively can on a traditional filesystem where the block size to file size ratio is relatively tiny. 

Small Files on the Datanodes

At allocation time, a  small file will require a single block file per datanode. Note that the actual number of blocks required to store that file on the cluster depends on HDFS replication policy, which defaults to 3. So factoring in replication, a small (less than 1 block) file is replicated at three identical block files on separate nodes.

That block file is the same size as the small file -- large files would span several blocks and be split into block size files -- a large file that was 350MB big on a system with 128MB block size would be split into 3 blocks, the first two of 128MB, the last one of 94MB. Each of those would be replicated according to the replication policy of the cluster. The only files that don't take up space on the datanodes are zero byte files, which still take up space on the namenode.

Regardless of actual size, at allocation time, HDFS treats a small file as having a minimum size of one HDFS block per datanode when it is calculating available disk space.  So, even if a file is really small, if there is less than a three blocks available on the cluster, the file cannot be stored on the system.

Space Quotas
HDFS only has less than the number of replicated blocks left than it needs to store a file when it is either running out of space, or, more commonly, if there is a space quota on the directory that the file is being copied to. Calculating storage cost in blocks allows HDFS safely store data to a known maximum size, no matter what the actual size of the file is. HDFS will only permit new block creation if there is enough disk space to create a block on N datanodes, where N is the replication factor.

This article shows how HDFS block size, combined with the replication factor, not file actual size, determines available space.

Conclusion

Is this really a problem? Sort of...it's a matter of efficiency. Space quotas are checked by the amount of remaining space on a datanode disk. If a block file takes up 12MB on a system that has 128MB block, there are effectively 114MB available to be added into the available bytes for the space quota -- for a replication factor of 3, that would be 342MB available, or 2.67 blocks. While you could argue that effectively .67 blocks of that space is wasted, 2 blocks of that space is still available for quota calculations. While 2.67 blocks is less than the minimum amount of space required to store a file of _any_ size in an HDFS with a replication factor of 3, if you were to have 2 small files of 12MB, you have 5.34 blocks available across the system -- effectively if you always mod 'leftover space' by replication factor, at most you are wasting replication factor # of blocks.

Granted that's not the most efficient use of disk, but it's not as if a small file takes up a 'virtual' block that gets factored in the next time a file is copied into the cluster.

The bigger problem with small files is the lack of efficiency that is encountered in mapreduce operations. Reducing the number of mappers being used and traversing blocks of data at a time is not possible with small files -- one mapper is spun up per file, and the overhead involved in copying the jar file to the task tracker node, starting up the JVM, etc, only makes sense if there is a substantial amount of data to process. You can't go wrong with large files -- they will split across blocks, which are processed more efficiently.