Distributed data processing with Hadoop, Part 2
Going further
Install and configure a multinode cluster
Content series:
This content is part # of # in the series: Distributed data processing with Hadoop, Part 2
This content is part of the series:Distributed data processing with Hadoop, Part 2
Stay tuned for additional content in this series.
The true power of the Hadoop distributed computing architecture lies in its distribution. In other words, the ability to distribute work to many nodes in parallel permits Hadoop to scale to large infrastructures and, similarly, the processing of large amounts of data. This article starts with a decomposition of a distributed Hadoop architecture, and then explores distributed configuration and use.
The distributed Hadoop architecture
Recall from Part 1 in this series that all Hadoop daemons were run on the same host. Although not exercising the parallel nature of Hadoop, this pseudo-distributed configuration permitted an easy way to test the features of Hadoop with minimal setup. Now, let's explore the parallel nature of Hadoop using a cluster of machines.
From Part 1, the Hadoop configuration defined that all Hadoop daemons run on a single node. So, let's first look at how Hadoop is naturally distributed for parallel operation. In a distributed Hadoop setup, you'll have a master node and some number of slave nodes (see Figure 1).
Figure 1. Hadoop master and slave node decomposition

As shown in Figure 1, the master node consists of the namenode, secondary namenode, and jobtracker daemons (the so-called master daemons). In addition, this is the node from which you manage the cluster for the purposes of this demomonstration (using the Hadoop utility and browser). The slave nodes consist of the tasktracker and the datanode (the slave daemons). The distinction of this setup is that the master node contains those daemons that provide management and coordination of the Hadoop cluster, where the slave node contains the daemons that implement the storage functions for the Hadoop file system (HDFS) and MapReduce functionality (the data processing function).
For this demonstration, you create a master node and two slave nodes sitting on a single LAN. This setup is shown in Figure 2. Now, let's explore the installation of Hadoop for multinode distribution and its configuration.
Figure 2. Hadoop cluster configuration

To simplify the deployment, you employ virtualization, which provides a few advantages. Although performance may not be advantageous in this setting, using virtualization, it's possible to create a Hadoop installation, and then clone it for the other nodes. For this reason, your Hadoop cluster should appear as follows, running the master and slave nodes as virtual machines (VMs) in the context of a hypervisor on a single host (see Figure 3).
Figure 3. Hadoop cluster configuration in a virtual environment

Upgrading Hadoop
In Part 1, you installed a special distribution for Hadoop that ran on a single node (pseudo-configuration). In this article, you update for a distributed configuration. If you've begun this article series here, read through Part 1 to install the Hadoop pseudo-configuration first.
In the pseudo-configuration, you performed no configuration, as everything
was preconfigured for a single node. Now, you need to update the
configuration. First, check the current configuration using the
update-alternatives
command as shown in Listing
1. This command tells you that the configuration is using conf.pseudo (the
highest priority).
Listing 1. Checking the current Hadoop configuration
$ update-alternatives --display hadoop-0.20-conf hadoop-0.20-conf - status is auto. link currently points to /etc/hadoop-0.20/conf.pseudo /etc/hadoop-0.20/conf.empty - priority 10 /etc/hadoop-0.20/conf.pseudo - priority 30 Current `best' version is /etc/hadoop-0.20/conf.pseudo. $
Next, create a new configuration by copying an existing one (in this case, conf.empty, as shown in Listing 1):
$ sudo cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.dist $
Finally, activate and check the new configuration:
Listing 2. Activating and checking the Hadoop configuration
$ sudo update-alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf \ /etc/hadoop-0.20/conf.dist 40 $ update-alternatives --display hadoop-0.20-conf hadoop-0.20-conf - status is auto. link currently points to /etc/hadoop-0.20/conf.dist /etc/hadoop-0.20/conf.empty - priority 10 /etc/hadoop-0.20/conf.pseudo - priority 30 /etc/hadoop-0.20/conf.dist - priority 40 Current `best' version is /etc/hadoop-0.20/conf.dist. $
Now, you have a new configuration called conf.dist that you'll use for your new distributed configuration. At this stage, running in a virtualized environment, you clone this node into two additional nodes that will serve as the data nodes.
Configuring Hadoop for distributed operation
The next step is to make all the nodes familiar with one another. You do this in the /etc/hadoop-0.20/conf.dist files called masters and slaves. The three nodes in this example are statically assigned IP addresses, as shown here (from /etc/hosts):
Listing 3. Hadoop nodes for this setup (/etc/hosts)
master 192.168.108.133 slave1 192.168.108.134 slave2 192.168.108.135
So, on the master node, you update /etc/hadoop-0.20/conf.dist/masters to identify the master node, which appears as:
master
and then identify the slave nodes in /etc/hadoop-0.20/conf.dist/slaves, which contains the following two lines:
slave1 slave2
Next, from each node, connect through Secure Shell (ssh) to each of the other nodes to ensure that pass-phraseless ssh is working. Each of these files (masters, slaves) is used by the Hadoop start and stop utilities that you used in Part 1 of this series.
Next, continue with Hadoop-specific configuration in the /etc/hadoop-0.20/conf.dist subdirectory. The following changes are required on all nodes (master and both slaves), as defined by the Hadoop documentation. First, identify the HDFS master in the file core-site.xml (Listing 4), which defines the host and port of the namenode (note the use of the master node's IP address). The file core-site.xml defines the core properties of Hadoop.
Listing 4. Defining the HDFS master in core-site.xml
<configuration> <property> <name>fs.default.name<name> <value>hdfs://master:54310<value> <description>The name and URI of the default FS.</description> <property> <configuration>
Next, identify the MapReduce jobtracker. This jobtracker could exist on its own node, but for this configuration, place it on the master node as shown in Listing 5. The file mapred-site.xml contains the MapReduce properties.
Listing 5. Defining the MapReduce jobtracker in mapred-site.xml
<configuration> <property> <name>mapred.job.tracker<name> <value>master:54311<value> <description>Map Reduce jobtracker<description> <property> <configuration>
Finally, define the default replication factor (Listing 6). This value defines the number of replicas that will be created and is commonly no larger than three. In this case, you define it as 2 (the number of your datanodes). This value is defined in hdfs-site.xml, which contains the HDFS properties.
Listing 6. Defining the default replication for data in hdfs-site.xml
<configuration> <property> <name>dfs.replication<name> <value>2<value> <description>Default block replication<description> <property> <configuration>
The configuration items shown in Listing 4, Listing 5, and Listing 6) are the required elements for your distributed setup. Hadoop provides a large number of configuration options here, which allows you to tailor the entire environment. The Related topics section provides more information on what's available.
With your configuration complete, the next step is to format your namenode
(the HDFS master node). For this operation, use the
hadoop-0.20
utility, specifying the namenode
and operation (-format
):
Listing 7. Formatting the namenode
user@master:~# sudo su - root@master:~# hadoop-0.20 namenode -format 10/05/11 18:39:58 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = master/127.0.1.1 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 0.20.2+228 STARTUP_MSG: build = -r cfc3233ece0769b11af9add328261295aaf4d1ad; ************************************************************/ 10/05/11 18:39:59 INFO namenode.FSNamesystem: fsOwner=root,root 10/05/11 18:39:59 INFO namenode.FSNamesystem: supergroup=supergroup 10/05/11 18:39:59 INFO namenode.FSNamesystem: isPermissionEnabled=true 10/05/11 18:39:59 INFO common.Storage: Image file of size 94 saved in 0 seconds. 10/05/11 18:39:59 INFO common.Storage: Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted. 10/05/11 18:39:59 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at master/127.0.1.1 ************************************************************/ root@master:~#
With your namenode formatted, it's time to start the Hadoop daemons. You do
this identically to your previous pseudo-distributed configuration in Part
1, but the process accomplishes the same thing for a
distributed configuration. Note here that this code starts the namenode
and secondary namenode (as indicated by the jps
command):
Listing 8. Starting the namenode
root@master:~# /usr/lib/hadoop-0.20/bin/start-dfs.sh starting namenode, logging to /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-namenode-mtj-desktop.out 192.168.108.135: starting datanode, logging to /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out 192.168.108.134: starting datanode, logging to /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out 192.168.108.133: starting secondarynamenode, logging to /usr/lib/hadoop-0.20/logs/hadoop-root-secondarynamenode-mtj-desktop.out root@master:~# jps 7367 NameNode 7618 Jps 7522 SecondaryNameNode root@master:~#
If you now inspect one of the slave nodes (data nodes) using
jps
, you'll see that a datanode daemon now
exists on each node:
Listing 9. Inspecting the datanode on one of the slave nodes
root@slave1:~# jps 10562 Jps 10451 DataNode root@slave1:~#
The next step is to start the MapReduce daemons (jobtracker and
tasktracker). You do this as shown in Listing 10.
Note that the script starts the jobtracker on the master node (as defined
by your configuration; see Listing 5) and the
tasktrackers on each slave node. A jps
command
on the master node shows that the jobtracker is now running.
Listing 10. Starting the MapReduce daemons
root@master:~# /usr/lib/hadoop-0.20/bin/start-mapred.sh starting jobtracker, logging to /usr/lib/hadoop-0.20/logs/hadoop-root-jobtracker-mtj-desktop.out 192.168.108.134: starting tasktracker, logging to /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out 192.168.108.135: starting tasktracker, logging to /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out root@master:~# jps 7367 NameNode 7842 JobTracker 7938 Jps 7522 SecondaryNameNode root@master:~#
Finally, check a slave node with jps
. Here, you
see that a tasktracker daemon has joined the datanode daemon to each slave
data node:
Listing 11. Inspecting the datanode on one of the slave nodes
root@slave1:~# jps 7785 DataNode 8114 Jps 7991 TaskTracker root@slave1:~#
The relationships between the start scripts, the nodes, and the daemons
that are started are shown in Figure 4. As you can see, the
start-dfs
script starts the namenodes and
datanodes, where the start-mapred
script starts
the jobtracker and tasktrackers.
Figure 4. Relationship of the start scripts and daemons for each node

Testing HDFS
Now that Hadoop is up and running across your cluster, you can run a couple
of tests to ensure that it's operational (see Listing 12). First, issue a
file system command (fs
) through the
hadoop-0.20
utility and request a
df
(disk free) operation. As with Linux®,
this command simply identifies the space consumed and available for the
particular device. So, with a newly formatted file system, you've used no
space. Next, perform an ls
operation on the
root of HDFS, create a subdirectory, list its contents, and remove it.
Finally, you can perform an fsck
(file system
check) on HDFS using the fsck
command within
the hadoop-0.20
utility. All this tells
you—along with a variety of other information (such as 2 datanodes
were detected)—that the file system is healthy.
Listing 12. Checking the HDFS
root@master:~# hadoop-0.20 fs -df File system Size Used Avail Use% / 16078839808 73728 3490967552 0% root@master:~# hadoop-0.20 fs -ls / Found 1 items drwxr-xr-x - root supergroup 0 2010-05-12 12:16 /tmp root@master:~# hadoop-0.20 fs -mkdir test root@master:~# hadoop-0.20 fs -ls test root@master:~# hadoop-0.20 fs -rmr test Deleted hdfs://192.168.108.133:54310/user/root/test root@master:~# hadoop-0.20 fsck / .Status: HEALTHY Total size: 4 B Total dirs: 6 Total files: 1 Total blocks (validated): 1 (avg. block size 4 B) Minimally replicated blocks: 1 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 2 Average block replication: 2.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 2 Number of racks: 1 The filesystem under path '/' is HEALTHY root@master:~#
Performing a MapReduce job
The next step is to perform a MapReduce job to validate that your entire
setup is working properly (see Listing 13). The first step of this process
is to introduce some data. So, begin by creating a directory to hold your
input data (called input), which you do using the
hadoop-0.20
utility with the
mkdir
command. Then, use the
put
command of
hadoop-0.20
to put two files into HDFS. You can
check the contents of the input directory using the
ls
command of the Hadoop utility.
Listing 13. Generating input data
root@master:~# hadoop-0.20 fs -mkdir input root@master:~# hadoop-0.20 fs -put \ /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input root@master:~# hadoop-0.20 fs -put \ /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input root@master:~# hadoop-0.20 fs -ls input Found 2 items -rw-r--r-- 2 root supergroup 78031 2010-05-12 14:16 /user/root/input/memory-barriers.txt -rw-r--r-- 2 root supergroup 33567 2010-05-12 14:16 /user/root/input/rt-mutex-design.txt root@master:~#
Next, kick off the wordcount MapReduce job. As in the pseudo-distributed model, you specify your input subdirectory (which contains the input files) and the output directory (which doesn't exist but will be created by the namenode and populated with the result data):
Listing 14. Running the MapReduce wordcount job on the cluster
root@master:~# hadoop-0.20 jar \ /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar wordcount input output 10/05/12 19:04:37 INFO input.FileInputFormat: Total input paths to process : 2 10/05/12 19:04:38 INFO mapred.JobClient: Running job: job_201005121900_0001 10/05/12 19:04:39 INFO mapred.JobClient: map 0% reduce 0% 10/05/12 19:04:59 INFO mapred.JobClient: map 50% reduce 0% 10/05/12 19:05:08 INFO mapred.JobClient: map 100% reduce 16% 10/05/12 19:05:17 INFO mapred.JobClient: map 100% reduce 100% 10/05/12 19:05:19 INFO mapred.JobClient: Job complete: job_201005121900_0001 10/05/12 19:05:19 INFO mapred.JobClient: Counters: 17 10/05/12 19:05:19 INFO mapred.JobClient: Job Counters 10/05/12 19:05:19 INFO mapred.JobClient: Launched reduce tasks=1 10/05/12 19:05:19 INFO mapred.JobClient: Launched map tasks=2 10/05/12 19:05:19 INFO mapred.JobClient: Data-local map tasks=2 10/05/12 19:05:19 INFO mapred.JobClient: FileSystemCounters 10/05/12 19:05:19 INFO mapred.JobClient: FILE_BYTES_READ=47556 10/05/12 19:05:19 INFO mapred.JobClient: HDFS_BYTES_READ=111598 10/05/12 19:05:19 INFO mapred.JobClient: FILE_BYTES_WRITTEN=95182 10/05/12 19:05:19 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30949 10/05/12 19:05:19 INFO mapred.JobClient: Map-Reduce Framework 10/05/12 19:05:19 INFO mapred.JobClient: Reduce input groups=2974 10/05/12 19:05:19 INFO mapred.JobClient: Combine output records=3381 10/05/12 19:05:19 INFO mapred.JobClient: Map input records=2937 10/05/12 19:05:19 INFO mapred.JobClient: Reduce shuffle bytes=47562 10/05/12 19:05:19 INFO mapred.JobClient: Reduce output records=2974 10/05/12 19:05:19 INFO mapred.JobClient: Spilled Records=6762 10/05/12 19:05:19 INFO mapred.JobClient: Map output bytes=168718 10/05/12 19:05:19 INFO mapred.JobClient: Combine input records=17457 10/05/12 19:05:19 INFO mapred.JobClient: Map output records=17457 10/05/12 19:05:19 INFO mapred.JobClient: Reduce input records=3381 root@master:~#
The final step is to explore the output data. Because you ran the wordcount MapReduce job, the result is a single file (reduced from the processed map files). This file contains a list of tuples representing the words found in the input files and the number of times they appeared in all input files:
Listing 15. Inspecting the output of the MapReduce job
root@master:~# hadoop-0.20 fs -ls output Found 2 items drwxr-xr-x - root supergroup 0 2010-05-12 19:04 /user/root/output/_logs -rw-r--r-- 2 root supergroup 30949 2010-05-12 19:05 /user/root/output/part-r-00000 root@master:~# hadoop-0.20 fs -cat output/part-r-00000 | head -13 != 1 "Atomic 2 "Cache 2 "Control 1 "Examples 1 "Has 7 "Inter-CPU 1 "LOAD 1 "LOCK" 1 "Locking 1 "Locks 1 "MMIO 1 "Pending 5 root@master:~#
Web management interfaces
Although the hadoop-0.20
utility is extremely
versatile and rich, sometimes it's more convenient to use a GUI, instead.
You can attach to the namenode for file system inspection through
http://master:50070 and to the jobtracker through http://master:50030.
Through the namenode, you can inspect the HDFS, as shown in Figure 5,
where you inspect the input directory (which contains your input
data—recall from Listing 13).
Figure 5. Inspecting the HDFS through the namenode

Through the jobtracker, you can inspect running or completed jobs. In Figure 6, you can see an inspection of your last job (from Listing 14). This figure shows the various data emitted as output to the Java archive (JAR) request but also the status and number of tasks. Note here that two map tasks were performed (one for each input file) and one reduce task (to reduce the two map inputs).
Figure 6. Checking the status of a finished job

Finally, you can check on the status of your datanodes through the namenode. The namenode main page identifies the number of live and dead nodes (as links) and allows you to inspect them further. The page shown in Figure 7 shows your live datanodes in addition to statistics for each.
Figure 7. Checking the status of the live datanodes

Many other views are possible through the namenode and jobtracker Web interfaces, but for brevity, this sample set is shown. Within the namenode and jobtracker Web pages, you'll find a number of links that will take you to additional information about Hadoop configuration and operation (including run time logs).
Going further
With this installment, you've seen how a pseudo-distributed configuration from Cloudera can be transformed into a fully distributed configuration. Surprisingly few steps along with an identical interface for MapReduce applications makes Hadoop a uniquely useful tool for distributed processing. Also interesting is exploring the scalability of Hadoop. By adding new datanodes (along with updating their XML files and slave files in the master), you can easily scale Hadoop for even higher levels of parallel processing. Part 3, the final installment in this Hadoop series, will explore how to develop a MapReduce application for Hadoop.
Downloadable resources
Related topics
- Part 1 of this series, Distributed data processing with Hadoop, Part 1: Getting started (developerWorks, May 2010), showed how to install Hadoop for a pseudo-distributed configuration (in other words, running all daemons on a single node).
- Hadoop is developed through the Apache Software Foundation.
- Download IBM InfoSphere BigInsights Basic Edition at no charge and build a solution that turns large, complex volumes of data into insight by combining Apache Hadoop with unique technologies and capabilities from IBM.
- You can learn more about Cloudera at its main site. Cloudera also maintains a nice set of documentation for installing and using Hadoop (in addition to Pig and Hive, Hadoop's large data set manipulation language and data warehouse infrastructure built on Hadoop).
- Find free courses on Hadoop fundamentals, stream computing, text analytics, and more at Big Data University.
- Yahoo! provides a great set of resources for Hadoop at the developer network. In particular is the Yahoo! Hadoop Tutorial, which introduces Hadoop and provides a detailed discussion of its use and configuration.
- In "Cloud computing with Linux and Apache Hadoop" (developerWorks, October 2009), learn more about Hadoop and its architecture.
- In the developerWorks Linux zone, find hundreds of how-to articles and tutorials, as well as downloads, discussion forums, and a wealth other resources for Linux developers and administrators.
- Evaluate IBM products in the way that suits you best: Download a product trial, try a product online, use a product in a cloud environment.
- Follow developerWorks on Twitter, or subscribe to a feed of Linux tweets on developerWorks.