Distributed data processing with Hadoop, Part 1

Getting started

Install and configure a simple cluster


Although Hadoop is the core of data reduction for some of the largest search engines, it's better described as a framework for the distributed processing of data. And not just data, but massive amounts of data, as would be required for search engines and the crawled data they collect. As a distributed framework, Hadoop enables many applications that benefit from parallelization of data processing.

This article is not meant to introduce you to Hadoop and its architecture but rather to demonstrate a simple Hadoop setup. In the Related topics section, you can find more details on Hadoop architecture, components, and theory of operation. With that disclaimer in place, let's dive right into Hadoop installation and configuration.

Initial setup

For this demonstration, we''ll use the Cloudera Hadoop distribution. You'll find support for a variety of different Linux® distributions there, so it's ideal for getting started.

This article assumes first that your system has Java™ technology (minimum release 1.6) and cURL installed. If not, you need to add those first (see the Related topics section for more information on this installation).

Because I'm running on Ubuntu (the Intrepid release), I use the apt utility to grab the Hadoop distribution. This process is quite simple and allows me to grab the binary package without the additional details of downloading and building the source. First, I tell apt about the Cloudera site. I then create a new file in /etc/apt/sources.list.d/cloudera.list and add the following text:

deb intrepid-cdh3 contrib
deb-src intrepid-cdh3 contrib

If you're running Jaunty or another release, just replace intrepid with your specific release name (current support includes Hardy, Intrepid, Jaunty, Karmic, and Lenny).

Next, I grab the apt-key from Cloudera to validate the downloaded package:

$ curl -s | \
sudo apt-key add - sudo apt-get update

And then install Hadoop for a pseudo-distributed configuration (all of the Hadoop daemons run on a single host):

$ sudo apt-get install hadoop-0.20-conf-pseudo

Note that this configuration is around 23MB (not including any other packages apt pulls in that may not be present). This installation is ideal for playing with Hadoop and learning about its elements and interfaces.

Finally, I set up passphrase-less SSH. If you try to use ssh localhost and a passphrase is requested, you'll need to perform the following steps. I assume that this is a dedicated Hadoop box, as this step does have some security implications (see Listing 1).

Listing 1. Setting up for passphrase-less SSH
$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/ >> ~/.ssh/authorized_keys

One final note is that you need to ensure that your host has sufficient storage available for the datanode (the cache). Insufficient storage manifests itself in strange ways (such as errors indicating the inability to replicate to a node).

Starting Hadoop

Now, you're ready to start Hadoop, which you effectively do by starting each of the Hadoop daemons. But first, format your Hadoop File System (HDFS) using the hadoop command. The hadoop command has a number of uses, some of which we'll explore shortly.

First, request the namenode to format the DFS file system. You do this as part of the installation, but it's useful to know if you ever need to generate a clean file system.

# hadoop-0.20 namenode -format

After acknowledging the request, the file system will be formated and some information returned. Next, start the Hadoop daemons. Hadoop starts five daemons in this pseudo-distributed configuration: namenode, secondarynamenode, datanode, jobtracker, and tasktracker. When each of the daemons has been started, you'll see a small amount of text emitted for each (identifying where its logs are stored). Each daemon is being started to run in the background (as a daemon). Figure 1 illustrates what the pseudo-distributed node looks like once the startup is complete.

Figure 1. Pseudo-distributed Hadoop configuration
Block diagram of pseudo-distributed Hadoop configuration
Block diagram of pseudo-distributed Hadoop configuration

Hadoop provides some helper tools to simplify its startup. These tools are categorized as start (such as start-dfs) and stop (such as stop-dfs). The following short script illustrates how to start the Hadoop node:

# /usr/lib/hadoop-0.20/bin/
# /usr/lib/hadoop-0.20/bin/

To verify that the daemons are running, you can use the jps command (which is a ps utility for JVM processes). This command lists the five daemons and their process identifiers.

Now that the Hadoop daemons are running, let's look back at them to introduce what each accomplishes in the Hadoop framework. The namenode is the master server in Hadoop and manages the file system namespace and access to the files stored in the cluster. There's also a secondary namenode, which isn't a redundant daemon for the namenode but instead provides period checkpointing and housekeeping tasks. You'll find one namenode and one secondary namenode in a Hadoop cluster.

The datanode manages the storage attached to a node, of which there can be multiple nodes in a cluster. Each node storing data will have a datanode daemon running.

Finally, each cluster will have a single jobtracker that is responsible for scheduling work at the datanodes and a tasktracker per datanode that performs the actual work. The jobtracker and tasktracker behave in a master-slave arrangement, where the jobtracker distributes work across the datanodes and the tasktracker performs the task. The jobtracker also validates requested work and, if a datanode fails for some reason, reschedules the previous task.

In this simple configuration, all nodes simply reside on the same node (see Figure 1). But from the previous discussion, it's easy to see how Hadoop provides parallel processing of work. Although the architecture is simple, Hadoop provides an easy way to distribute data, load balance, and parallel process large amounts of data in a fault-tolerant way.

Inspecting HDFS

You can perform a couple of tests to ensure that Hadoop is up and running normally (at least the namenode). Knowing that all of your processes are available, you can use the hadoop command to inspect the local namespace (see Listing 2).

Listing 2. Checking access to the HDFS
# hadoop-0.20 fs -ls /
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-04-29 16:38 /user
drwxr-xr-x   - root supergroup          0 2010-04-29 16:28 /var

From this, you can see that the namenode is up and able to service the local namespace. Notice that you're using a command called hadoop-0.20 to inspect the file system. This utility is how you interact with the Hadoop cluster, from inspecting the file system to running jobs in the cluster. Note the command structure here: After specifying the hadoop-0.20 utility, you define a command (in this case, the generic file system shell) and one or more options (in this case, you request a file list using ls). As the hadoop-0.20 is one of your primary interfaces to the Hadoop cluster, you'll see this utility used quite a bit through this article. Listing 3 provides some additional file system operations with which you can explore this interface a bit further (creating a new subdirectory called test, listing its contents, and then removing it).

Listing 3. Exploring file system manipulation in Hadoop
# hadoop-0.20 fs -mkdir test
# hadoop-0.20 fs -ls test
# hadoop-0.20 fs -rmr test
Deleted hdfs://localhost/user/root/test

Testing Hadoop

Now that you have installed Hadoop and tested the basic interface to its file system, it's time to test Hadoop in a real application. In this example, you see the MapReduce process on a small set of data. Map and reduce are named after functions in functional programming but provide the core capability for data reduction. Map refers to the process of chopping input into a smaller set of sub-problems for processing (where these sub-problems are distributed to parallel workers). Reduce refers to the assembly of answers from the sub-problems into a single set of output. Note that I haven't defined what processing means here, as the framework permits you to define this yourself. Canonical MapReduce is the calculation of word frequency in a set of documents.

Per the previous discussion, you'll have a set of inputs and a resulting set of outputs. The first step is to create an input subdirectory in the file system into which you'll drop your work. You do this using:

# hadoop-0.20 fs -mkdir input

Next, drop some work in the input subdirectory. In this case, use the put command, which moves a file from the local file system into the HDFS (see Listing 4). Note the format below, which moves the source file to the HDFS subdirectory (input). Once done, you'll have two text files in HDFS ready to be processed.

Listing 4. Moving files into HDFS
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt  input
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt  input

Next, you can check for the presence of the files using the ls command (see Listing 5).

Listing 5. Checking files in HDFS
# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  1 root supergroup 78031 2010-04-29 17:35 /user/root/input/memory-barriers.txt
-rw-r--r--  1 root supergroup 33567 2010-04-29 17:36 /user/root/input/rt-mutex-design.txt 

With your work safely in HDFS, you can perform the MapReduce function. This function requires a single command but a long request, as shown in Listing 6. This command requests the execution of a JAR. It actually implements a number of capabilities, but this example focuses on wordcount. The jobtracker daemon requests that the datanode perform the MapReduce job, which results in a considerable amount of output (smaller here, because you're only processing two files). It shows the progress of the map and reduce functions, and then provides some useful statistics regarding the I/O for both file system and records processing.

Listing 6. Performing a MapReduce job for word frequency (wordcount)
# hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar \
wordcount input output
10/04/29 17:36:49 INFO input.FileInputFormat: Total input paths to process : 2
10/04/29 17:36:49 INFO mapred.JobClient: Running job: job_201004291628_0009
10/04/29 17:36:50 INFO mapred.JobClient:  map 0% reduce 0%
10/04/29 17:37:00 INFO mapred.JobClient:  map 100% reduce 0%
10/04/29 17:37:06 INFO mapred.JobClient:  map 100% reduce 100%
10/04/29 17:37:08 INFO mapred.JobClient: Job complete: job_201004291628_0009
10/04/29 17:37:08 INFO mapred.JobClient: Counters: 17
10/04/29 17:37:08 INFO mapred.JobClient:   Job Counters 
10/04/29 17:37:08 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/29 17:37:08 INFO mapred.JobClient:     Launched map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:     Data-local map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:   FileSystemCounters
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/04/29 17:37:08 INFO mapred.JobClient:   Map-Reduce Framework
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input groups=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Combine output records=3381
10/04/29 17:37:08 INFO mapred.JobClient:     Map input records=2937
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce output records=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Spilled Records=6762
10/04/29 17:37:08 INFO mapred.JobClient:     Map output bytes=168718
10/04/29 17:37:08 INFO mapred.JobClient:     Combine input records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Map output records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input records=3381

With the processing complete, inspect the result. Recall that the point of the job is to calculate the number of times words occurred in the input files. This output is emitted as a file of tuples, representing the word and the number of times it appeared in the input. You can use the cat command (after finding the particular output file) through the hadoop-0.20 utility to emit this data (see Listing 7).

Listing 7. Reviewing the output from the MapReduce wordcount operation
# hadoop-0.20 fs -ls /user/root/output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-04-29 17:36 /user/root/output/_logs
-rw-r--r--   1 root supergroup      30949 2010-04-29 17:37 /user/root/output/part-r-00000
# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1
"Has 7
"Inter-CPU 1
"LOCK" 1
"Locking 1
"Locks 1
"Pending 5

You can also extract the file from HDFS using the hadoop-0.20 utility (see Listing 8). You do this easily with the get utility (analogous to the put you executed earlier to write files into HDFS). For the get operation, specify the file in HDFS to extract (from your output subdirectory) and the file name to write in the local file system (output.txt).

Listing 8. Extracting the output from HDFS
# hadoop-0.20 fs -get output/part-r-00000 output.txt
# cat output.txt | head -5
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1

Let's look at another example using the same JAR but a different use (here, you'll explore a parallel grep). For this test, use your existing input files but remove the output subdirectory to recreate it for this test:

# hadoop-0.20 fs -rmr output
Deleted hdfs://localhost/user/root/output

Next, request the MapReduce job for grep. In this case, the grep is performed in parallel (the map), and then the grep results are combined (the reduce). Listing 9 provides the output for this use model (but in this case, some of the output has been pruned for brevity). Note that command request here, where your request is a grep taking input from the subdirectory called input and placing the result in a subdirectory called output. The final parameter is the string you're searching for (in this case, 'kernel').

Listing 9. Performing a MapReduce Job for word search count (grep)
# hadoop-0.20 jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar \
grep input output 'kernel'
10/04/30 09:22:29 INFO mapred.FileInputFormat: Total input paths to process : 2
10/04/30 09:22:30 INFO mapred.JobClient: Running job: job_201004291628_0010
10/04/30 09:22:31 INFO mapred.JobClient:  map 0% reduce 0%
10/04/30 09:22:42 INFO mapred.JobClient:  map 66% reduce 0%
10/04/30 09:22:45 INFO mapred.JobClient:  map 100% reduce 0%
10/04/30 09:22:54 INFO mapred.JobClient:  map 100% reduce 100%
10/04/30 09:22:56 INFO mapred.JobClient: Job complete: job_201004291628_0010
10/04/30 09:22:56 INFO mapred.JobClient: Counters: 18
10/04/30 09:22:56 INFO mapred.JobClient:   Job Counters 
10/04/30 09:22:56 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/30 09:22:56 INFO mapred.JobClient:     Launched map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:     Data-local map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:   FileSystemCounters
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_READ=57
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_READ=113144
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=222
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=109
10/04/30 09:23:14 INFO mapred.JobClient:     Map output bytes=15
10/04/30 09:23:14 INFO mapred.JobClient:     Map input bytes=23
10/04/30 09:23:14 INFO mapred.JobClient:     Combine input records=0
10/04/30 09:23:14 INFO mapred.JobClient:     Map output records=1
10/04/30 09:23:14 INFO mapred.JobClient:     Reduce input records=1

With the job complete, inspect the output directory (to identify the results file), and then perform a file system cat operation to view its contents (see Listing 10).

Listing 10. Inspecting the output of the MapReduce job
# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x  - root supergroup    0 2010-04-30 09:22 /user/root/output/_logs
-rw-r--r--  1 root supergroup   10 2010-04-30 09:23 /user/root/output/part-00000
# hadoop-0.20 fs -cat output/part-00000
17 kernel

Web-based interfaces

You've seen how to inspect the HDFS, but if you're looking for information about Hadoop's operation, you'll find the Web interfaces useful. Recall that at the top of the Hadoop cluster is the namenode, which manages the HDFS. You can explore high-level details of the file system (such as available and used space and available datanodes) as well as the running jobs through http://localhost:50070. You can dig deeper into the jobtracker (job status) through http://localhost:50030. Note that in both of these cases, you reference localhost, because all daemons are running on the same host.

Going farther

This article explored the installation and initial configuration of a simple (pseudo-distributed) Hadoop cluster (in this case, using Cloudera's distribution for Hadoop). I chose this particular distribution because it simplified the installation and initial configuration of Hadoop. You can find a number of distributions for Hadoop (including the source) at See the Related topics section for more information.

But what if you lack the hardware resources to scale your Hadoop cluster for your specific needs? It turns out that Hadoop is so popular, you can easily run it within cloud computing infrastructures using pre-built Hadoop VMs and leased servers. Amazon provides Amazon Machine Images (AMIs) as well as compute resources within the Amazon Elastic Compute Cloud (Amazon EC2). Additionally, Microsoft recently announced coming support for Hadoop within its Windows® Azure Services Platform.

From this article, it's easy to see how Hadoop makes distributed computing simple for processing large datasets. The next article in this series will explore how to configure Hadoop in a multi-node cluster with additional examples. See you then!

Downloadable resources

Related topics


Sign in or register to add and subscribe to comments.

Zone=Linux, Big data and analytics, Cloud computing
ArticleTitle=Distributed data processing with Hadoop, Part 1: Getting started