Distributed data processing with Hadoop, Part 2: Going further

Install and configure a multinode cluster

The first article in this series showed how to use Hadoop in a single-node cluster. This article continues with a more advanced setup that uses multiple nodes for parallel processing. It demonstrates the various node types required for multinode clusters and explores MapReduce functionality in a parallel environment. This article also digs into the management aspects of Hadoop—both command line and Web based.

Share:

M. Tim Jones, Independent author

M. Tim JonesM. Tim Jones is an embedded firmware architect and the author of Artificial Intelligence: A Systems Approach, GNU/Linux Application Programming (now in its second edition), AI Application Programming (in its second edition), and BSD Sockets Programming from a Multilanguage Perspective. His engineering background ranges from the development of kernels for geosynchronous spacecraft to embedded systems architecture and networking protocols development. Tim is a Consultant Engineer for Emulex Corp. in Longmont, Colorado.



03 June 2010

Also available in Russian Japanese Vietnamese

Connect with Tim

Tim is one of our most popular and prolific authors. Browse all of Tim's articles on developerWorks. Check out Tim's profile and connect with him, other authors, and fellow readers in My developerWorks.

The true power of the Hadoop distributed computing architecture lies in its distribution. In other words, the ability to distribute work to many nodes in parallel permits Hadoop to scale to large infrastructures and, similarly, the processing of large amounts of data. This article starts with a decomposition of a distributed Hadoop architecture, and then explores distributed configuration and use.

The distributed Hadoop architecture

Recall from Part 1 in this series that all Hadoop daemons were run on the same host. Although not exercising the parallel nature of Hadoop, this pseudo-distributed configuration permitted an easy way to test the features of Hadoop with minimal setup. Now, let's explore the parallel nature of Hadoop using a cluster of machines.

From Part 1, the Hadoop configuration defined that all Hadoop daemons run on a single node. So, let's first look at how Hadoop is naturally distributed for parallel operation. In a distributed Hadoop setup, you'll have a master node and some number of slave nodes (see Figure 1).

Figure 1. Hadoop master and slave node decomposition
Hadoop master and slave node decomposition

As shown in Figure 1, the master node consists of the namenode, secondary namenode, and jobtracker daemons (the so-called master daemons). In addition, this is the node from which you manage the cluster for the purposes of this demomonstration (using the Hadoop utility and browser). The slave nodes consist of the tasktracker and the datanode (the slave daemons). The distinction of this setup is that the master node contains those daemons that provide management and coordination of the Hadoop cluster, where the slave node contains the daemons that implement the storage functions for the Hadoop file system (HDFS) and MapReduce functionality (the data processing function).

For this demonstration, you create a master node and two slave nodes sitting on a single LAN. This setup is shown in Figure 2. Now, let's explore the installation of Hadoop for multinode distribution and its configuration.

Figure 2. Hadoop cluster configuration
Hadoop cluster configuration

To simplify the deployment, you employ virtualization, which provides a few advantages. Although performance may not be advantageous in this setting, using virtualization, it's possible to create a Hadoop installation, and then clone it for the other nodes. For this reason, your Hadoop cluster should appear as follows, running the master and slave nodes as virtual machines (VMs) in the context of a hypervisor on a single host (see Figure 3).

Figure 3. Hadoop cluster configuration in a virtual environment
Hadoop cluster configuration in a virtual environment

Upgrading Hadoop

In Part 1, you installed a special distribution for Hadoop that ran on a single node (pseudo-configuration). In this article, you update for a distributed configuration. If you've begun this article series here, read through Part 1 to install the Hadoop pseudo-configuration first.

In the pseudo-configuration, you performed no configuration, as everything was preconfigured for a single node. Now, you need to update the configuration. First, check the current configuration using the update-alternatives command as shown in Listing 1. This command tells you that the configuration is using conf.pseudo (the highest priority).

Listing 1. Checking the current Hadoop configuration
$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.pseudo
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
Current `best' version is /etc/hadoop-0.20/conf.pseudo.
$

Next, create a new configuration by copying an existing one (in this case, conf.empty, as shown in Listing 1):

$ sudo cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.dist
$

Finally, activate and check the new configuration:

Listing 2. Activating and checking the Hadoop configuration
$ sudo update-alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf \
  /etc/hadoop-0.20/conf.dist 40
$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.dist
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
/etc/hadoop-0.20/conf.dist - priority 40
Current `best' version is /etc/hadoop-0.20/conf.dist.
$

Now, you have a new configuration called conf.dist that you'll use for your new distributed configuration. At this stage, running in a virtualized environment, you clone this node into two additional nodes that will serve as the data nodes.


Configuring Hadoop for distributed operation

The next step is to make all the nodes familiar with one another. You do this in the /etc/hadoop-0.20/conf.dist files called masters and slaves. The three nodes in this example are statically assigned IP addresses, as shown here (from /etc/hosts):

Listing 3. Hadoop nodes for this setup (/etc/hosts)
master 192.168.108.133
slave1 192.168.108.134
slave2 192.168.108.135

So, on the master node, you update /etc/hadoop-0.20/conf.dist/masters to identify the master node, which appears as:

master

and then identify the slave nodes in /etc/hadoop-0.20/conf.dist/slaves, which contains the following two lines:

slave1
slave2

Next, from each node, connect through Secure Shell (ssh) to each of the other nodes to ensure that pass-phraseless ssh is working. Each of these files (masters, slaves) is used by the Hadoop start and stop utilities that you used in Part 1 of this series.

Next, continue with Hadoop-specific configuration in the /etc/hadoop-0.20/conf.dist subdirectory. The following changes are required on all nodes (master and both slaves), as defined by the Hadoop documentation. First, identify the HDFS master in the file core-site.xml (Listing 4), which defines the host and port of the namenode (note the use of the master node's IP address). The file core-site.xml defines the core properties of Hadoop.

Listing 4. Defining the HDFS master in core-site.xml
<configuration>

  <property>
    <name>fs.default.name<name>
    <value>hdfs://master:54310<value>
    <description>The name and URI of the default FS.</description>
  <property>

<configuration>

Next, identify the MapReduce jobtracker. This jobtracker could exist on its own node, but for this configuration, place it on the master node as shown in Listing 5. The file mapred-site.xml contains the MapReduce properties.

Listing 5. Defining the MapReduce jobtracker in mapred-site.xml
<configuration>

  <property>
    <name>mapred.job.tracker<name>
    <value>master:54311<value>
    <description>Map Reduce jobtracker<description>
  <property>

<configuration>

Finally, define the default replication factor (Listing 6). This value defines the number of replicas that will be created and is commonly no larger than three. In this case, you define it as 2 (the number of your datanodes). This value is defined in hdfs-site.xml, which contains the HDFS properties.

Listing 6. Defining the default replication for data in hdfs-site.xml
<configuration>

  <property>
    <name>dfs.replication<name>
    <value>2<value>
    <description>Default block replication<description>
  <property>

<configuration>

The configuration items shown in Listing 4, Listing 5, and Listing 6) are the required elements for your distributed setup. Hadoop provides a large number of configuration options here, which allows you to tailor the entire environment. The Resources section provides more information on what's available.

With your configuration complete, the next step is to format your namenode (the HDFS master node). For this operation, use the hadoop-0.20 utility, specifying the namenode and operation (-format):

Listing 7. Formatting the namenode
user@master:~# sudo su -
root@master:~# hadoop-0.20 namenode -format
10/05/11 18:39:58 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2+228
STARTUP_MSG:   build =  -r cfc3233ece0769b11af9add328261295aaf4d1ad; 
************************************************************/
10/05/11 18:39:59 INFO namenode.FSNamesystem: fsOwner=root,root
10/05/11 18:39:59 INFO namenode.FSNamesystem: supergroup=supergroup
10/05/11 18:39:59 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/05/11 18:39:59 INFO common.Storage: Image file of size 94 saved in 0 seconds.
10/05/11 18:39:59 INFO common.Storage: 
  Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted.
10/05/11 18:39:59 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/127.0.1.1
************************************************************/
root@master:~#

With your namenode formatted, it's time to start the Hadoop daemons. You do this identically to your previous pseudo-distributed configuration in Part 1, but the process accomplishes the same thing for a distributed configuration. Note here that this code starts the namenode and secondary namenode (as indicated by the jps command):

Listing 8. Starting the namenode
root@master:~# /usr/lib/hadoop-0.20/bin/start-dfs.sh
starting namenode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-namenode-mtj-desktop.out
192.168.108.135: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.134: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.133: starting secondarynamenode, 
  logging to /usr/lib/hadoop-0.20/logs/hadoop-root-secondarynamenode-mtj-desktop.out
root@master:~# jps
7367 NameNode
7618 Jps
7522 SecondaryNameNode
root@master:~#

If you now inspect one of the slave nodes (data nodes) using jps, you'll see that a datanode daemon now exists on each node:

Listing 9. Inspecting the datanode on one of the slave nodes
root@slave1:~# jps
10562 Jps
10451 DataNode
root@slave1:~#

The next step is to start the MapReduce daemons (jobtracker and tasktracker). You do this as shown in Listing 10. Note that the script starts the jobtracker on the master node (as defined by your configuration; see Listing 5) and the tasktrackers on each slave node. A jps command on the master node shows that the jobtracker is now running.

Listing 10. Starting the MapReduce daemons
root@master:~# /usr/lib/hadoop-0.20/bin/start-mapred.sh
starting jobtracker, logging to 
  /usr/lib/hadoop-0.20/logs/hadoop-root-jobtracker-mtj-desktop.out
192.168.108.134: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
192.168.108.135: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
root@master:~# jps
7367 NameNode
7842 JobTracker
7938 Jps
7522 SecondaryNameNode
root@master:~#

Finally, check a slave node with jps. Here, you see that a tasktracker daemon has joined the datanode daemon to each slave data node:

Listing 11. Inspecting the datanode on one of the slave nodes
root@slave1:~# jps
7785 DataNode
8114 Jps
7991 TaskTracker
root@slave1:~#

The relationships between the start scripts, the nodes, and the daemons that are started are shown in Figure 4. As you can see, the start-dfs script starts the namenodes and datanodes, where the start-mapred script starts the jobtracker and tasktrackers.

Figure 4. Relationship of the start scripts and daemons for each node
Relationship of the start scripts and daemons for each node

Testing HDFS

Now that Hadoop is up and running across your cluster, you can run a couple of tests to ensure that it's operational (see Listing 12). First, issue a file system command (fs) through the hadoop-0.20 utility and request a df (disk free) operation. As with Linux®, this command simply identifies the space consumed and available for the particular device. So, with a newly formatted file system, you've used no space. Next, perform an ls operation on the root of HDFS, create a subdirectory, list its contents, and remove it. Finally, you can perform an fsck (file system check) on HDFS using the fsck command within the hadoop-0.20 utility. All this tells you—along with a variety of other information (such as 2 datanodes were detected)—that the file system is healthy.

Listing 12. Checking the HDFS
root@master:~# hadoop-0.20 fs -df
File system		Size	Used	Avail		Use%
/		16078839808	73728	3490967552	0%
root@master:~# hadoop-0.20 fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2010-05-12 12:16 /tmp
root@master:~# hadoop-0.20 fs -mkdir test
root@master:~# hadoop-0.20 fs -ls test
root@master:~# hadoop-0.20 fs -rmr test
Deleted hdfs://192.168.108.133:54310/user/root/test
root@master:~# hadoop-0.20 fsck /
.Status: HEALTHY
 Total size:	4 B
 Total dirs:	6
 Total files:	1
 Total blocks (validated):	1 (avg. block size 4 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		2
 Number of racks:		1

The filesystem under path '/' is HEALTHY
root@master:~#

Performing a MapReduce job

The next step is to perform a MapReduce job to validate that your entire setup is working properly (see Listing 13). The first step of this process is to introduce some data. So, begin by creating a directory to hold your input data (called input), which you do using the hadoop-0.20 utility with the mkdir command. Then, use the put command of hadoop-0.20 to put two files into HDFS. You can check the contents of the input directory using the ls command of the Hadoop utility.

Listing 13. Generating input data
root@master:~# hadoop-0.20 fs -mkdir input
root@master:~# hadoop-0.20 fs -put \
  /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input
root@master:~# hadoop-0.20 fs -put \
  /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input
root@master:~# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  2 root supergroup  78031 2010-05-12 14:16 /user/root/input/memory-barriers.txt
-rw-r--r--  2 root supergroup  33567 2010-05-12 14:16 /user/root/input/rt-mutex-design.txt
root@master:~#

Next, kick off the wordcount MapReduce job. As in the pseudo-distributed model, you specify your input subdirectory (which contains the input files) and the output directory (which doesn't exist but will be created by the namenode and populated with the result data):

Listing 14. Running the MapReduce wordcount job on the cluster
root@master:~# hadoop-0.20 jar \
  /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar wordcount input output
10/05/12 19:04:37 INFO input.FileInputFormat: Total input paths to process : 2
10/05/12 19:04:38 INFO mapred.JobClient: Running job: job_201005121900_0001
10/05/12 19:04:39 INFO mapred.JobClient:  map 0% reduce 0%
10/05/12 19:04:59 INFO mapred.JobClient:  map 50% reduce 0%
10/05/12 19:05:08 INFO mapred.JobClient:  map 100% reduce 16%
10/05/12 19:05:17 INFO mapred.JobClient:  map 100% reduce 100%
10/05/12 19:05:19 INFO mapred.JobClient: Job complete: job_201005121900_0001
10/05/12 19:05:19 INFO mapred.JobClient: Counters: 17
10/05/12 19:05:19 INFO mapred.JobClient:   Job Counters 
10/05/12 19:05:19 INFO mapred.JobClient:     Launched reduce tasks=1
10/05/12 19:05:19 INFO mapred.JobClient:     Launched map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:     Data-local map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:   FileSystemCounters
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/05/12 19:05:19 INFO mapred.JobClient:   Map-Reduce Framework
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input groups=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Combine output records=3381
10/05/12 19:05:19 INFO mapred.JobClient:     Map input records=2937
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce output records=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Spilled Records=6762
10/05/12 19:05:19 INFO mapred.JobClient:     Map output bytes=168718
10/05/12 19:05:19 INFO mapred.JobClient:     Combine input records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Map output records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input records=3381
root@master:~#

The final step is to explore the output data. Because you ran the wordcount MapReduce job, the result is a single file (reduced from the processed map files). This file contains a list of tuples representing the words found in the input files and the number of times they appeared in all input files:

Listing 15. Inspecting the output of the MapReduce job
root@master:~# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-05-12 19:04 /user/root/output/_logs
-rw-r--r--   2 root supergroup      30949 2010-05-12 19:05 /user/root/output/part-r-00000
root@master:~# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!=	1
"Atomic	2
"Cache	2
"Control	1
"Examples	1
"Has	7
"Inter-CPU	1
"LOAD	1
"LOCK"	1
"Locking	1
"Locks	1
"MMIO	1
"Pending	5
root@master:~#

Web management interfaces

Although the hadoop-0.20 utility is extremely versatile and rich, sometimes it's more convenient to use a GUI, instead. You can attach to the namenode for file system inspection through http://master:50070 and to the jobtracker through http://master:50030. Through the namenode, you can inspect the HDFS, as shown in Figure 5, where you inspect the input directory (which contains your input data—recall from Listing 13).

Figure 5. Inspecting the HDFS through the namenode
Inspecting the HDFS through the namenode

Through the jobtracker, you can inspect running or completed jobs. In Figure 6, you can see an inspection of your last job (from Listing 14). This figure shows the various data emitted as output to the Java archive (JAR) request but also the status and number of tasks. Note here that two map tasks were performed (one for each input file) and one reduce task (to reduce the two map inputs).

Figure 6. Checking the status of a finished job
Checking the status of a finished job

Finally, you can check on the status of your datanodes through the namenode. The namenode main page identifies the number of live and dead nodes (as links) and allows you to inspect them further. The page shown in Figure 7 shows your live datanodes in addition to statistics for each.

Figure 7. Checking the status of the live datanodes
Checking the status of the live datanodes

Many other views are possible through the namenode and jobtracker Web interfaces, but for brevity, this sample set is shown. Within the namenode and jobtracker Web pages, you'll find a number of links that will take you to additional information about Hadoop configuration and operation (including run time logs).


Going further

With this installment, you've seen how a pseudo-distributed configuration from Cloudera can be transformed into a fully distributed configuration. Surprisingly few steps along with an identical interface for MapReduce applications makes Hadoop a uniquely useful tool for distributed processing. Also interesting is exploring the scalability of Hadoop. By adding new datanodes (along with updating their XML files and slave files in the master), you can easily scale Hadoop for even higher levels of parallel processing. Part 3, the final installment in this Hadoop series, will explore how to develop a MapReduce application for Hadoop.

Resources

Learn

Get products and technologies

Discuss

  • Get involved in the My developerWorks community. Connect with other developerWorks users while exploring the developer-driven blogs, forums, groups, and wikis.

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Linux on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Linux, Big data and analytics, Open source, Java technology, Cloud computing
ArticleID=494335
ArticleTitle=Distributed data processing with Hadoop, Part 2: Going further
publish-date=06032010