I've been changing a database ETL application into a set of MapReduces up on EC2. I need s3 as my input and output for each MapReduce, and was excited to see that Hadoop had s3 filesystem support built in.
I then started looking at 0.19.0, where the issue was fixed, but found another issue (again, s3 related, this time reading the input directory). I was able to reproduce this issue on my local box immediately, which saved some time.
This left me with 0.20.0, which claimed to have both issues fixed. I tested 0.20.0 on my local box with a small data set, and it passed. The next step was to build an AMI with Hadoop 0.20.0 on it, deploy that AMI to a reasonable sized cluster, and try to get through an entire run of my 133 million record input set, which was estimated to reduce to a 7.5 million record output set.
I decided to start by using the 0.20.0 src/contrib/ec2 scripts. The learning experience working with the original src/contrib/ec2 files in 18.3, and then working with the Cloudera scripts allowed me to move much faster this time.
In order to build an image using the scripts, you need to specify the following (in addition to the account access variables detailed here).
HADOOP_VERSION -- I set to 0.20.0
S3_BUCKET -- I used my own bucket to store the AMI.
INSTANCE_TYPE -- Amazon small and medium instances are 32 bit, Large and XLarge instances are 64 bit. Specifying INSTANCE_TYPE lets the shell load the correct base OS image.
JAVA_BINARY_URL -- the download link to the version of Java you want to use. Note this varies depending on the architecture (i386 or X86_64). For i386 I used:
http://cds.sun.com/is-bin/INTERSHOP.enfinity/WFS/CDS-CDS_Developer-Site/en_US/-/USD/VerifyItem-Start/jdk-6u13-linux-i586.bin?BundledLineItemUUID=teRIBe.ohNsAAAEhz0pwgkAW&OrderID=yI1IBe.oPMYAAAEhuEpwgkAW&ProductID=RGtIBe.ou1AAAAEfpVYcydOO&FileName=/jdk-6u13-linux-i586.bin
Note that I then had to change my JAVA_VERSION to match the minor version specified: i.e. for the link above I had to set JAVA_VERSION to 1.6.0_13.
Now that all variables were configured, I ran
hadoop-ec2 create-image
to create the exact Hadoop image I needed.
With the image created I then used
hadoop-ec2 initialize-cluster mycluster 20
to create a 20 node cluster. I logged in, and the first thing I noticed was that JobTracker was not running on the master, and TaskTracker was not running on the slaves. Even though they were specified to start right after the NameNode and DataNode (respectively) in the shell file executed at AMI boot time:
if [ "$IS_MASTER" == "true" ]; then
# MASTER
...
# Hadoop
# only format on first boot
[ ! -e /mnt/hadoop/dfs ] && "$HADOOP_HOME"/bin/hadoop namenode -format
"$HADOOP_HOME"/bin/hadoop-daemon.sh start namenode
"$HADOOP_HOME"/bin/hadoop-daemon.sh start jobtracker
else
# SLAVE
...
# Hadoop
"$HADOOP_HOME"/bin/hadoop-daemon.sh start datanode
"$HADOOP_HOME"/bin/hadoop-daemon.sh start tasktracker
fi
So I ran the following command to get the (internal) names of the slave nodes (from my laptop):
ec2-describe-instances | grep -w 'infocloud' | grep -ve 'infocloud-cluster.*' | awk '{print $5}
In this line I grep for my security group (infocloud) and then excluded the non AMI lines that contained my cluster name (infocloud-cluster.*), and finally print the fifth element in the list. This gives me a list of (Amazon EC2) internal domain names, like this:
domU-12-31-39-02-B4-F3.compute-1.internal
domU-12-31-39-00-B5-12.compute-1.internal
domU-12-31-39-00-5D-E3.compute-1.internal
domU-12-31-39-00-56-46.compute-1.internal
domU-12-31-39-00-58-51.compute-1.internal
domU-12-31-39-00-A8-B6.compute-1.internal
domU-12-31-39-00-85-D1.compute-1.internal
domU-12-31-39-01-74-22.compute-1.internal
domU-12-31-39-00-E8-94.compute-1.internal
domU-12-31-39-00-C6-13.compute-1.internal
domU-12-31-39-00-DC-65.compute-1.internal
domU-12-31-39-00-4D-D3.compute-1.internal
domU-12-31-39-01-5C-B6.compute-1.internal
domU-12-31-39-00-B2-54.compute-1.internal
domU-12-31-39-00-66-06.compute-1.internal
domU-12-31-39-00-E5-B7.compute-1.internal
domU-12-31-39-00-68-06.compute-1.internal
domU-12-31-39-00-88-46.compute-1.internal
domU-12-31-39-00-7D-C8.compute-1.internal
domU-12-31-39-00-A1-08.compute-1.internal
domU-12-31-39-00-C2-15.compute-1.internal
Note that the first node in this list is the root node. I echoed this output into a file that I then pushed up to the master:
hadoop-ec2 push infocloud-cluster nodes.txt
and then wrote some ruby to parse it:
HADOOP_HOME="/usr/local/hadoop-0.20.0"
File.open("slaves.txt") do | file |
cmd = "ssh #{slave} #{HADOOP_HOME}/bin/hadoop-daemon.sh start"
while(slave = file.gets)
slave = slave.chomp
hostname = `hostname`
if(slave.starts_with(hostname))
cmd += " jobtracker"
else
cmd += " tasktracker"
end
`#{cmd}`
end
end
So I start the job tracker for the master node (where I run the job from), otherwise I start the task tracker. Note I shouldn't have to do this, and I'm still trying to figure out why the original command in the remote startup script didn't work.
Once job tracker and task trackers were started, the cluster was effectively up. I'm going to see if I can get the remote startup script to work as designed, because that final step is hacky.
Finally, once I had started up the cluster successfully, I noticed that there was only one node configured to do reduces. I remedied this by changing my generated hadoop-site.xml, which, btw, is flagged deprecated for the 0.20.0 version (it still works, but probably not for the next version). The hadoop-site.xml is generated in hadoop-ec2-init-remote.sh, here is what I modified:
With these changes in place, I have started a hadoop job that has both s3 inputs and outputs.
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/mnt/hadoop</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://$MASTER_HOST:50001</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://$MASTER_HOST:50002</value>
</property>
<property>
<name>tasktracker.http.threads</name>
<value>80</value>
</property>
<property>
<name>mapred.reduce.parallel.copies</name>
<value>20</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>20</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>3</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>3</value>
</property>
<property>
<name>mapred.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>
<property>
<name>dfs.client.block.write.retries</name>
<value>3</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>YOUR ACCESS KEY ID</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>YOUR SECRET KEY</value>
</property>
</configuration>