Distributed data processing with Hadoop, Part 1
Install and configure a simple cluster
Although Hadoop is the core of data reduction for some of the largest search engines, it's better described as a framework for the distributed processing of data. And not just data, but massive amounts of data, as would be required for search engines and the crawled data they collect. As a distributed framework, Hadoop enables many applications that benefit from parallelization of data processing.
This article is not meant to introduce you to Hadoop and its architecture but rather to demonstrate a simple Hadoop setup. In the Related topics section, you can find more details on Hadoop architecture, components, and theory of operation. With that disclaimer in place, let's dive right into Hadoop installation and configuration.
For this demonstration, we''ll use the Cloudera Hadoop distribution. You'll find support for a variety of different Linux® distributions there, so it's ideal for getting started.
This article assumes first that your system has Java™ technology (minimum release 1.6) and cURL installed. If not, you need to add those first (see the Related topics section for more information on this installation).
Because I'm running on Ubuntu (the Intrepid release), I use the
apt utility to grab the Hadoop distribution.
This process is quite simple and allows me to grab the binary package
without the additional details of downloading and building the source.
First, I tell
apt about the Cloudera site. I
then create a new file in /etc/apt/sources.list.d/cloudera.list and add
the following text:
deb http://archive.cloudera.com/debian intrepid-cdh3 contrib deb-src http://archive.cloudera.com/debian intrepid-cdh3 contrib
If you're running Jaunty or another release, just replace intrepid with your specific release name (current support includes Hardy, Intrepid, Jaunty, Karmic, and Lenny).
Next, I grab the apt-key from Cloudera to validate the downloaded package:
$ curl -s http://archive.cloudera.com/debian/archive.key | \ sudo apt-key add - sudo apt-get update
And then install Hadoop for a pseudo-distributed configuration (all of the Hadoop daemons run on a single host):
$ sudo apt-get install hadoop-0.20-conf-pseudo $
Note that this configuration is around 23MB (not including any other
apt pulls in that may not be present).
This installation is ideal for playing with Hadoop and learning about its
elements and interfaces.
Finally, I set up passphrase-less SSH. If you try to use
ssh localhost and a passphrase is requested,
you'll need to perform the following steps. I assume that this is a
dedicated Hadoop box, as this step does have some security implications
(see Listing 1).
Listing 1. Setting up for passphrase-less SSH
$ sudo su - # ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa # cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
One final note is that you need to ensure that your host has sufficient storage available for the datanode (the cache). Insufficient storage manifests itself in strange ways (such as errors indicating the inability to replicate to a node).
Now, you're ready to start Hadoop, which you effectively do by starting
each of the Hadoop daemons. But first, format your Hadoop File System
(HDFS) using the
hadoop command. The
hadoop command has a number of uses, some of
which we'll explore shortly.
First, request the namenode to format the DFS file system. You do this as part of the installation, but it's useful to know if you ever need to generate a clean file system.
# hadoop-0.20 namenode -format
After acknowledging the request, the file system will be formated and some information returned. Next, start the Hadoop daemons. Hadoop starts five daemons in this pseudo-distributed configuration: namenode, secondarynamenode, datanode, jobtracker, and tasktracker. When each of the daemons has been started, you'll see a small amount of text emitted for each (identifying where its logs are stored). Each daemon is being started to run in the background (as a daemon). Figure 1 illustrates what the pseudo-distributed node looks like once the startup is complete.
Figure 1. Pseudo-distributed Hadoop configuration
Hadoop provides some helper tools to simplify its startup. These tools are
categorized as start (such as
stop (such as
stop-dfs). The following short
script illustrates how to start the Hadoop node:
# /usr/lib/hadoop-0.20/bin/start-dfs.sh # /usr/lib/hadoop-0.20/bin/start-mapred.sh #
To verify that the daemons are running, you can use the
jps command (which is a
ps utility for JVM processes). This command
lists the five daemons and their process identifiers.
Now that the Hadoop daemons are running, let's look back at them to introduce what each accomplishes in the Hadoop framework. The namenode is the master server in Hadoop and manages the file system namespace and access to the files stored in the cluster. There's also a secondary namenode, which isn't a redundant daemon for the namenode but instead provides period checkpointing and housekeeping tasks. You'll find one namenode and one secondary namenode in a Hadoop cluster.
The datanode manages the storage attached to a node, of which there can be multiple nodes in a cluster. Each node storing data will have a datanode daemon running.
Finally, each cluster will have a single jobtracker that is responsible for scheduling work at the datanodes and a tasktracker per datanode that performs the actual work. The jobtracker and tasktracker behave in a master-slave arrangement, where the jobtracker distributes work across the datanodes and the tasktracker performs the task. The jobtracker also validates requested work and, if a datanode fails for some reason, reschedules the previous task.
In this simple configuration, all nodes simply reside on the same node (see Figure 1). But from the previous discussion, it's easy to see how Hadoop provides parallel processing of work. Although the architecture is simple, Hadoop provides an easy way to distribute data, load balance, and parallel process large amounts of data in a fault-tolerant way.
You can perform a couple of tests to ensure that Hadoop is up and running
normally (at least the namenode). Knowing that all of your processes are
available, you can use the
hadoop command to
inspect the local namespace (see Listing 2).
Listing 2. Checking access to the HDFS
# hadoop-0.20 fs -ls / Found 2 items drwxr-xr-x - root supergroup 0 2010-04-29 16:38 /user drwxr-xr-x - root supergroup 0 2010-04-29 16:28 /var #
From this, you can see that the namenode is up and able to service the
local namespace. Notice that you're using a command called
hadoop-0.20 to inspect the file system. This
utility is how you interact with the Hadoop cluster, from inspecting the
file system to running jobs in the cluster. Note the command structure
here: After specifying the
you define a command (in this case, the generic file system shell) and one
or more options (in this case, you request a file list using
ls). As the
hadoop-0.20 is one of your primary interfaces
to the Hadoop cluster, you'll see this utility used quite a bit through
this article. Listing 3 provides some additional file system operations
with which you can explore this interface a bit further (creating a new
subdirectory called test, listing its contents, and then removing
Listing 3. Exploring file system manipulation in Hadoop
# hadoop-0.20 fs -mkdir test # hadoop-0.20 fs -ls test # hadoop-0.20 fs -rmr test Deleted hdfs://localhost/user/root/test #
Now that you have installed Hadoop and tested the basic interface to its file system, it's time to test Hadoop in a real application. In this example, you see the MapReduce process on a small set of data. Map and reduce are named after functions in functional programming but provide the core capability for data reduction. Map refers to the process of chopping input into a smaller set of sub-problems for processing (where these sub-problems are distributed to parallel workers). Reduce refers to the assembly of answers from the sub-problems into a single set of output. Note that I haven't defined what processing means here, as the framework permits you to define this yourself. Canonical MapReduce is the calculation of word frequency in a set of documents.
Per the previous discussion, you'll have a set of inputs and a resulting set of outputs. The first step is to create an input subdirectory in the file system into which you'll drop your work. You do this using:
# hadoop-0.20 fs -mkdir input
Next, drop some work in the input subdirectory. In this case, use the
put command, which moves a file from the local
file system into the HDFS (see Listing 4). Note the format below, which
moves the source file to the HDFS subdirectory (input). Once done, you'll
have two text files in HDFS ready to be processed.
Listing 4. Moving files into HDFS
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input # hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input #
Next, you can check for the presence of the files using the
ls command (see Listing 5).
Listing 5. Checking files in HDFS
# hadoop-0.20 fs -ls input Found 2 items -rw-r--r-- 1 root supergroup 78031 2010-04-29 17:35 /user/root/input/memory-barriers.txt -rw-r--r-- 1 root supergroup 33567 2010-04-29 17:36 /user/root/input/rt-mutex-design.txt #
With your work safely in HDFS, you can perform the MapReduce function. This
function requires a single command but a long request, as shown in Listing
6. This command requests the execution of a JAR. It actually implements a
number of capabilities, but this example focuses on
wordcount. The jobtracker daemon requests that
the datanode perform the MapReduce job, which results in a considerable
amount of output (smaller here, because you're only processing two files).
It shows the progress of the map and reduce functions, and then provides
some useful statistics regarding the I/O for both file system and records
Listing 6. Performing a MapReduce job for word frequency (wordcount)
# hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar \ wordcount input output 10/04/29 17:36:49 INFO input.FileInputFormat: Total input paths to process : 2 10/04/29 17:36:49 INFO mapred.JobClient: Running job: job_201004291628_0009 10/04/29 17:36:50 INFO mapred.JobClient: map 0% reduce 0% 10/04/29 17:37:00 INFO mapred.JobClient: map 100% reduce 0% 10/04/29 17:37:06 INFO mapred.JobClient: map 100% reduce 100% 10/04/29 17:37:08 INFO mapred.JobClient: Job complete: job_201004291628_0009 10/04/29 17:37:08 INFO mapred.JobClient: Counters: 17 10/04/29 17:37:08 INFO mapred.JobClient: Job Counters 10/04/29 17:37:08 INFO mapred.JobClient: Launched reduce tasks=1 10/04/29 17:37:08 INFO mapred.JobClient: Launched map tasks=2 10/04/29 17:37:08 INFO mapred.JobClient: Data-local map tasks=2 10/04/29 17:37:08 INFO mapred.JobClient: FileSystemCounters 10/04/29 17:37:08 INFO mapred.JobClient: FILE_BYTES_READ=47556 10/04/29 17:37:08 INFO mapred.JobClient: HDFS_BYTES_READ=111598 10/04/29 17:37:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=95182 10/04/29 17:37:08 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30949 10/04/29 17:37:08 INFO mapred.JobClient: Map-Reduce Framework 10/04/29 17:37:08 INFO mapred.JobClient: Reduce input groups=2974 10/04/29 17:37:08 INFO mapred.JobClient: Combine output records=3381 10/04/29 17:37:08 INFO mapred.JobClient: Map input records=2937 10/04/29 17:37:08 INFO mapred.JobClient: Reduce shuffle bytes=47562 10/04/29 17:37:08 INFO mapred.JobClient: Reduce output records=2974 10/04/29 17:37:08 INFO mapred.JobClient: Spilled Records=6762 10/04/29 17:37:08 INFO mapred.JobClient: Map output bytes=168718 10/04/29 17:37:08 INFO mapred.JobClient: Combine input records=17457 10/04/29 17:37:08 INFO mapred.JobClient: Map output records=17457 10/04/29 17:37:08 INFO mapred.JobClient: Reduce input records=3381
With the processing complete, inspect the result. Recall that the point of
the job is to calculate the number of times words occurred in the input
files. This output is emitted as a file of tuples, representing the word
and the number of times it appeared in the input. You can use the
cat command (after finding the particular
output file) through the
hadoop-0.20 utility to
emit this data (see Listing 7).
Listing 7. Reviewing the output from the MapReduce wordcount operation
# hadoop-0.20 fs -ls /user/root/output Found 2 items drwxr-xr-x - root supergroup 0 2010-04-29 17:36 /user/root/output/_logs -rw-r--r-- 1 root supergroup 30949 2010-04-29 17:37 /user/root/output/part-r-00000 # # hadoop-0.20 fs -cat output/part-r-00000 | head -13 != 1 "Atomic 2 "Cache 2 "Control 1 "Examples 1 "Has 7 "Inter-CPU 1 "LOAD 1 "LOCK" 1 "Locking 1 "Locks 1 "MMIO 1 "Pending 5 #
You can also extract the file from HDFS using the
hadoop-0.20 utility (see Listing 8). You do
this easily with the
get utility (analogous to
put you executed earlier to write files
into HDFS). For the
get operation, specify the
file in HDFS to extract (from your output subdirectory) and the file name
to write in the local file system (output.txt).
Listing 8. Extracting the output from HDFS
# hadoop-0.20 fs -get output/part-r-00000 output.txt # cat output.txt | head -5 != 1 "Atomic 2 "Cache 2 "Control 1 "Examples 1 #
Let's look at another example using the same JAR but a different use (here,
you'll explore a parallel
grep). For this test,
use your existing input files but remove the output subdirectory to
recreate it for this test:
# hadoop-0.20 fs -rmr output Deleted hdfs://localhost/user/root/output
Next, request the MapReduce job for
this case, the
grep is performed in parallel
(the map), and then the
grep results are
combined (the reduce). Listing 9 provides the output for this use model
(but in this case, some of the output has been pruned for brevity). Note
that command request here, where your request is a
grep taking input from the subdirectory called
input and placing the result in a subdirectory called
output. The final parameter is the string you're searching
for (in this case,
Listing 9. Performing a MapReduce Job for word search count (grep)
# hadoop-0.20 jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar \ grep input output 'kernel' 10/04/30 09:22:29 INFO mapred.FileInputFormat: Total input paths to process : 2 10/04/30 09:22:30 INFO mapred.JobClient: Running job: job_201004291628_0010 10/04/30 09:22:31 INFO mapred.JobClient: map 0% reduce 0% 10/04/30 09:22:42 INFO mapred.JobClient: map 66% reduce 0% 10/04/30 09:22:45 INFO mapred.JobClient: map 100% reduce 0% 10/04/30 09:22:54 INFO mapred.JobClient: map 100% reduce 100% 10/04/30 09:22:56 INFO mapred.JobClient: Job complete: job_201004291628_0010 10/04/30 09:22:56 INFO mapred.JobClient: Counters: 18 10/04/30 09:22:56 INFO mapred.JobClient: Job Counters 10/04/30 09:22:56 INFO mapred.JobClient: Launched reduce tasks=1 10/04/30 09:22:56 INFO mapred.JobClient: Launched map tasks=3 10/04/30 09:22:56 INFO mapred.JobClient: Data-local map tasks=3 10/04/30 09:22:56 INFO mapred.JobClient: FileSystemCounters 10/04/30 09:22:56 INFO mapred.JobClient: FILE_BYTES_READ=57 10/04/30 09:22:56 INFO mapred.JobClient: HDFS_BYTES_READ=113144 10/04/30 09:22:56 INFO mapred.JobClient: FILE_BYTES_WRITTEN=222 10/04/30 09:22:56 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=109 ... 10/04/30 09:23:14 INFO mapred.JobClient: Map output bytes=15 10/04/30 09:23:14 INFO mapred.JobClient: Map input bytes=23 10/04/30 09:23:14 INFO mapred.JobClient: Combine input records=0 10/04/30 09:23:14 INFO mapred.JobClient: Map output records=1 10/04/30 09:23:14 INFO mapred.JobClient: Reduce input records=1 #
With the job complete, inspect the output directory (to identify the
results file), and then perform a file system
cat operation to view its contents (see Listing
Listing 10. Inspecting the output of the MapReduce job
# hadoop-0.20 fs -ls output Found 2 items drwxr-xr-x - root supergroup 0 2010-04-30 09:22 /user/root/output/_logs -rw-r--r-- 1 root supergroup 10 2010-04-30 09:23 /user/root/output/part-00000 # hadoop-0.20 fs -cat output/part-00000 17 kernel #
You've seen how to inspect the HDFS, but if you're looking for information about Hadoop's operation, you'll find the Web interfaces useful. Recall that at the top of the Hadoop cluster is the namenode, which manages the HDFS. You can explore high-level details of the file system (such as available and used space and available datanodes) as well as the running jobs through http://localhost:50070. You can dig deeper into the jobtracker (job status) through http://localhost:50030. Note that in both of these cases, you reference localhost, because all daemons are running on the same host.
This article explored the installation and initial configuration of a simple (pseudo-distributed) Hadoop cluster (in this case, using Cloudera's distribution for Hadoop). I chose this particular distribution because it simplified the installation and initial configuration of Hadoop. You can find a number of distributions for Hadoop (including the source) at apache.org. See the Related topics section for more information.
But what if you lack the hardware resources to scale your Hadoop cluster for your specific needs? It turns out that Hadoop is so popular, you can easily run it within cloud computing infrastructures using pre-built Hadoop VMs and leased servers. Amazon provides Amazon Machine Images (AMIs) as well as compute resources within the Amazon Elastic Compute Cloud (Amazon EC2). Additionally, Microsoft recently announced coming support for Hadoop within its Windows® Azure Services Platform.
From this article, it's easy to see how Hadoop makes distributed computing simple for processing large datasets. The next article in this series will explore how to configure Hadoop in a multi-node cluster with additional examples. See you then!
- Yahoo!'s Doug Cutting (now of Cloudera) developed Hadoop to support distribution of the Nutch search engine.
- The main site for Hadoop development is an Apache project.
- Cloudera provides pre-packaged Hadoop and VMs, making it simple to get started.
- See the license that Google recently granted Hadoop, making it safe to use Hadoop without worry of infringement. Google holds a patent to the ideas for Hadoop (efficient, large-scale processing of data, defined under patent 7,650,331).
- In "Distributed computing with Linux and Hadoop" (developerWorks, December 2008) and the more recent "Cloud computing with Linux and Apache Hadoop" (developerWorks, October 2009), learn more about Hadoop and its architecture.
- Read about MapReduce: Simplified Data Processing on Large Clusters in Google's seminal paper on this functional style of programming.
- See Wikipedia's introduction to MapReduce.
- See the commands for the Hadoop utility at the Apache site.
- IBM InfoSphere BigInsights Basic Edition -- IBM's Hadoop distribution -- is an integrated, tested and pre-configured, no-charge download for anyone who wants to experiment with and learn about Hadoop.
- Find free courses on Hadoop fundamentals, stream computing, text analytics, and more at Big Data University.
- Learn about Hadoop on the horizon and IBM's jStart (emerging technologies division). At jStart, you can also learn about BigSheets, IBM's mashup to extend business intelligence through Web data.
- Download IBM InfoSphere BigInsights Basic Edition at no charge and build a solution that turns large, complex volumes of data into insight by combining Apache Hadoop with unique technologies and capabilities from IBM.
- Cloudera offers a number of free (and stable!) Hadoop distributions. You can download a distribution for a variety of Linux distributions, a VM, and even a VM for Amazon EC2.
- The Windows Azure cloud is preparing to provide Hadoop as a VM, as of this writing, and Amazon provides machine images for Hadoop within its scalable Amazon EC2 infrastructure.
- In the developerWorks Linux zone, find hundreds of how-to articles and tutorials, as well as downloads, discussion forums, and a wealth other resources for Linux developers and administrators.
- Evaluate IBM products in the way that suits you best: Download a product trial, try a product online, use a product in a cloud environment, or spend a few hours in the SOA Sandbox learning how to implement Service Oriented Architecture efficiently.
- Follow developerWorks on Twitter, or subscribe to a feed of Linux tweets on developerWorks.