Hadoop is great for processing large quantities of data and resolving that information down into a smaller set of information that you can query. However, the processing time for that process can be huge. By integrating with Couchbase Server you can do live querying and reporting on information, while continuing to work with Hadoop for the large data set and heavy processing of the data set. Couchbase Server also uses a MapReduce querying system, which makes it easy for you to migrate and integrate your indexing and querying system to extract and manipulate the information effectively.

Martin Brown, VP of Technical Publications, Couchbase

author photoA professional writer for over 15 years, Martin 'MC' Brown is the author and contributor to over 26 books covering an array of topics, including the recently published Getting Started with CouchDB. His expertise spans myriad development languages and platforms Perl, Python, Java, JavaScript, Basic, Pascal, Modula-2, C, C++, Rebol, Gawk, Shellscript, Windows, Solaris, Linux, BeOS, Microsoft® WP, Mac OS and more. He is a former LAMP Technologies Editor for LinuxWorld magazine and is a regular contributor to ServerWatch.com, LinuxPlanet, ComputerWorld, and IBM developerWorks. He draws on a rich and varied background as founder member of a leading UK ISP, systems manager and IT consultant for an advertising agency and Internet solutions group, technical specialist for an intercontinental ISP network, and database designer and programmer and as a self-confessed compulsive consumer of computing hardware and software. MC is currently the VP of Technical Publications and Education for Couchbase and is responsible for all published documentation, training program and content, and the Couchbase Techzone.



11 September 2012

Also available in Chinese Japanese Portuguese Spanish

Hadoop and data processing

Hadoop combines a number of key features together that ultimately makes it very useful for processing a large quantity of data down into smaller, usable chunks.

The primary component is the HDFS file system, which allows for information to be distributed across a cluster. The information stored in this distributed format can also be processed individually on each cluster node through a system called MapReduce. The MapReduce process converts the information stored in the HDFS file system into smaller, processed and more manageable chunks.

Because Hadoop works on multiple nodes, it can be used to process vast quantities of input information and simplify it into more usable blocks of information. This processing is handled by using a simple MapReduce system.

MapReduce is a way of turning the incoming information, which may or may not be in a structured format, and converting it into a structure that can be more easily used, queried and processed.

For example, a typical usage is to process log information from hundreds of different applications so that you can identify specific problems, counts or other events. By using the MapReduce format, you can start to measure and look for trends — translating what would otherwise be a very significant quantity of information into a smaller size. When looking at the logs of a web server, for example, you might want to look at the errors that occur within a specific range on specific pages. You can write a MapReduce function to identify specific errors on individual pages, and generate that information in the output. Using this method, you can reduce many lines of information from the log files into a much smaller collection of records containing only the error information.

Understanding MapReduce

MapReduce works in two phases. The map process takes the incoming information and maps it into a standardized format. For some information types, this mapping can be direct and explicit. For example, if you are processing input data such as a web log, you will be extracting a single column of data from the text of the web log. For other data, the mapping might be more complex. Processing textual information, such as research papers, you might be extracting phrases or more complex blocks of data.

The reduce phase is used to collate and summarize the data together. The reduction can actually take place in a number of different ways, but the typical process is to perform a basic count, sum, or other statistic based on the individual data from the map phase.

Thinking about a simple example, such as the word count used as sample MapReduce in Hadoop, the map phase breaks apart raw text to identify individual words, and for each word, generates a block of output data. The reduce function then takes these blocks of mapped information, and reduces them down to increment the count for each unique word seen. Given a single text file of 100 words, the map process would generate 100 blocks of data, but the reduce phase would summarize this down to provide a count of each unique word into, say, 56 words, with a count of the number of times each word appeared.

With web logs, the map would take the input data, create a record for each error within the log file, and generate a block for each error that contains the date, time, and page that caused the problem.

Within Hadoop, the MapReduce phases take place on the individual nodes on which the individual blocks of source information are stored. This is what enables Hadoop to work with such large data sets of information — by allowing multiple nodes to work on the data simultaneously. With 100 nodes, for example, you could process 100 log files simultaneously and simplify many gigabytes (or terabytes) of information much quicker than could be achieved through a single node.

Hadoop limitations

One of the major limitations of the core Hadoop product is that there is no way to store and query information in the database. Data is added into the HDFS system, but you cannot ask Hadoop to return a list of all the data matching a specific data set. The primary reason for this is that Hadoop doesn't store, structure, or understand the structure of the data that is being stored within HDFS. This is why the MapReduce system is required to parse and process the information into a more structured format.

However, we can combine the processing power of Hadoop with a more traditional database so that we can query the data that Hadoop has generated through it's own MapReduce system. There are many possible solutions available, including many traditional SQL databases, but we can keep the MapReduce theme, which is very effective for large data sets, by using Couchbase Server.

The basic structure of the data sharing between the systems is shown in Figure 1.

Figure 1. Basic structure of the data sharing between the systems
Basic structure of the data sharing between the systems

Installing Hadoop

If you haven't installed Hadoop already, the easiest way is to make use of one of the Cloudera installations. For compatibility between Hadoop, Sqoop, and Couchbase, the best solution is to make use of the CDH3 installation (see Resources). For this, you will need to use Ubuntu 10.10 to 11.10. Later Ubuntu releases have introduced an incompatibility because they no longer support a package required by the Cloudera Hadoop installation.

Before installation, make sure you have installed a Java™ virtual machine, and ensure that you've configured the correct home directory of your JDK in the JAVA_HOME variable. Note that you must have a full Java Development Kit available, not just a Java Runtime Environment (JRE), as Sqoop compiles code to export and import data between Couchbase Server and Hadoop.

To install using the CDH3 on Ubuntu and similar systems, you need to do the following:

  1. Download the CDH3 configuration package. This adds the configuration for the CDH3 source files to the apt repository.
  2. Update your repository cache: $ apt-get update.
  3. Install the main Hadoop package: $ apt-get install hadoop-0.20.
  4. Install the Hadoop components (see Listing 1).
    Listing 1. Installing the Hadoop components
    $ for comp in namenode datanode secondarynamenode jobtracker tasktracker
    do
    apt-get install hadoop-0.20-$comp
    done
  5. Edit the configuration files to ensure you've set up the core components.
  6. Edit /etc/hadoop/conf/core-site.xml to read as shown in Listing 2.
    Listing 2. Edited /etc/hadoop/conf/core-site.xml file
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
      </property>
    </configuration>

    This configures the default hdfs location for storing data.
    Edit /etc/hadoop/conf/hdfs-site.xml (see Listing 3).
    Listing 3. Edited /etc/hadoop/conf/hdfs-site.xml file
    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    </configuration>

    This enables replication of the stored data.
    Edit /etc/hadoop/conf/mapred-site.xml (see Listing 4).
    Listing 4. Edited /etc/hadoop/conf/mapred-site.xml file
    <configuration>
      <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
      </property>
    </configuration>

    This enables the job tracker for MapReduce.
  7. Finally, edit the Hadoop environment to correctly point to the directory of your JDK installation within the /usr/lib/hadoop/conf/hadoop-env.sh. There will be a commented out line for the JAVA_HOME variable. You should uncomment it and set it to your JDK location. For example: export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk.
  8. Now, start up Hadoop on your system. The easiest way is to use the start-all.sh script: $ /usr/lib/hadoop/bin/start-all.sh.

Assuming everything is configured correctly, you should now have a running Hadoop system.


Couchbase Server overview

Couchbase Server is a clustered, document-based database system that makes use of a caching layer to provide very fast access to your data by storing the majority of your data in RAM. The system makes use of multiple nodes and a caching layer with automatic sharding across the cluster. This allows for an elastic nature so that you can grow and shrink the cluster to take advantage of more RAM or disk I/O to help improve performance.

All data in Couchbase Server is eventually persisted down to disk, but initially the writes and updates operate through the caching layer, which is what provides the high-performance and which we can exploit when processing Hadoop data to get live information and query the contents.

In its basic form, Couchbase Server remains as a basic document and key/value-based store. You can only retrieve the information from the cluster provided you know the document ID. In Couchbase Server 2.0, you can store documents in JSON format, and then use the view system to create a view on the stored JSON documents. A view is a MapReduce combination that is executed over the documents stored in the database. The output from a view is an index, matching the structure you've defined through the MapReduce functions. The existence of the index provides you with the ability to query the underlying document data.

We can use this functionality to take the processed data from Hadoop, store that information within Couchbase Server, and then use it as our basis for querying that data. Conveniently, Couchbase Server uses a MapReduce system for processing the documents and creating the indexes. This provides some level of compatibility and consistency with the methods for processing the data.

Installing Couchbase Server

Installing Couchbase Server is easy. Download the Couchbase Server 2.0 release from the Couchbase website for your platform (see Resources), and install the package using dpkg or RPM (depending on your platform).

After installation, Couchbase Server will start automatically. To configure it, open a web browser and point it to localhost:8091 on your machine (or access it remotely using the IP address of the machine).

Follow the on screen configuration instructions. You can use most of the default settings as provided during the installation, but the most important settings are the location of the data files for the data written into the database, and the amount of RAM you allocate to Couchbase Server.


Getting Couchbase Server to talk to the Hadoop connector

Couchbase Server uses the Sqoop connector to communicate your Hadoop cluster. Sqoop provides a connection to transfer data in bulk between Hadoop and Couchbase Server.

Technically, Sqoop is an application designed to convert information between structured databases and Hadoop. The name Sqoop is actually derived from SQL and Hadoop.

Installing Sqoop

If you are using the CDH3 installation, you can install sqoop by using your package manager: $ sudo apt-get install sqoop.

This will install sqoop in /usr/lib/sqoop.

Note: A recent bug in Sqoop means that it will sometimes try to transfer the wrong datasets. The fix is part of Sqoop Version 1.4.2. If you experience problems, try V1.4.2 or a later version.

Installing the Couchbase Hadoop Connector

The Couchbase Hadoop Connector is a collection of Java jar files that support the connectivity between Sqoop and Couchbase. Download the Hadoop connector from the Couchbase website (see Resources). The file is packaged as a zip file. Unzip it, and then run the install.sh script inside, supplying the location of the Sqoop system. For example: $ sudo bash install.sh /usr/lib/sqoop.

That installs all the necessary library and configuration files. Now we can start exchanging information between the two systems.

Importing Data from Couchbase Server to Hadoop

Although not the scenario we will directly deal with here, it's worth noting that we can export data from Couchbase Server into Hadoop. This could be useful if you had loaded a large quantity of data in Couchbase Server, and wanted to take advantage of Hadoop to process and simplify it. To do this, you can load the entire data set from the Couchbase Server into a Hadoop file within HDFS using: $ sqoop import --connect http://192.168.0.71:8091/pools --table cbdata.

The URL provided here is the location of the Couchbase Server bucket pool. The table specified here is actually the name of the directory within HDFS where the data will be stored.

The data itself is stored as a key/value dump of information from Couchbase Server. In Couchbase Server 2.0, this means that the data is written out using the unique document ID, and containing the JSON value of the record.

Writing JSON data in Hadoop MapReduce

For exchanging information between Hadoop and Couchbase Server, we need to speak a common language -- in this case the JSON (see Listing 5).

Listing 5. Outputting JSON within Hadoop MapReduce
package org.mcslp;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import com.google.gson.*;

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, 
Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, 
IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, 
IntWritable, Text, Text> {

        class wordRecord {
            private String word;
            private int count;
            wordRecord() {
            }
        }

        public void reduce(Text key,
                           Iterator<IntWritable> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            wordRecord word = new wordRecord();
            word.word = key.toString();;
            word.count = sum;

            Gson json = new Gson();
            System.out.println(json.toJson(word));
            output.collect(key, new Text(json.toJson(word)));
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

The code is a modification of the word counting sample provided with the Hadoop distribution.

This version uses the Google Gson library for writing JSON information from the reduce phase of the processing. For convenience, a new class is used (wordRecord), which is converted by Gson into a JSON record, which is the format we require on a document by document basis for Couchbase Server to process and parse the contents.

Note that we do not define a Combiner class for Hadoop. This will prevent Hadoop from trying to re-reduce the information, which with the current code will fail because our reduce takes in the word and a single digit and outputs a JSON value. For a secondary reduce/combine stage, we would need to parse the JSON input or define a new Combiner class that output the JSON version of the information. This simplifies the definition slightly.

To use this within Hadoop, you first need to copy the Google Gson library into the Hadoop directory (/usr/lib/hadoop/lib). Then restart Hadoop to ensure that the library has been correctly identified by Hadoop.

Next, compile your code into a directory: $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar:./google-gson-2.2.1/gson-2.2.1.jar -d wordcount_classes WordCount.java .

Now create a jar of your library: $ jar -cvf wordcount.jar -C wordcount_classes/.

With this completed, you can copy a number of text files into a directory, and then use this jar to process the text files into a count of the individual words, with a JSON record containing the word, and the count. For example, to process this data on some Project Gutenberg texts: $ hadoop jar wordcount.jar org.mcslp.WordCount /user/mc/gutenberg /user/mc/gutenberg-output.

This will generate a list of words in out directory having been counted by the MapReduce function within Hadoop.

Exporting the data from Hadoop to Couchbase Server

To get the data back from Hadoop and into Couchbase Server, we need to use Sqoop to export the data back: $ sqoop export --connect http://10.2.1.55:8091/pools --table ignored --export-dir gutenberg-output.

The --table argument in this example is ignored, but the --export-dir is the name of the directory where the information to be exported is located.


Writing MapReduce in Couchbase server

Within Hadoop, MapReduce functions are written using Java. Within Couchbase Server, the MapReduce functionality is written in Javascript. As an interpreted language, this means that you do not need to compile the view, and it allows you to edit and refine the MapReduce structure.

To create a view within Couchbase Server, open the admin console (on http://localhost:8091), and then click the View button. Views are collected within a design document. You can create multiple views in a single design document and create multiple design documents. To improve the overall performance of the server, the system also supports a development view that can be edited, and a production view that cannot be edited. The production view cannot be edited because doing so would invalidate the view index and cause the index to require rebuilding.

Click the Create Development View button and name your design document and view.

Within Couchbase Server, there are the same two functions: map and reduce. The map function is used to map the input data (JSON documents) to a table. The reduce function is then used to summarize and reduce that. Reduce functions are optional and are not required for the index functionality, so we'll ignore reduce functions for the purposes of this article.

For the map function, the format of the function is shown in Listing 6.

Listing 6. Format of the map function
map(doc) { 

}

The argument doc is each stored JSON document. The storage format for Couchbase Server is a JSON document and the view is in Javascript, so we can access a field in JSON called count using: doc.count.

To emit information from our map function, you call the emit() function. The emit() function takes two arguments, the first is the key, which is used to select and query information, and the second argument is the corresponding value. Thus we can create a map function that outputs the word and the count using the code in Listing 7.

Listing 7. map function that outputs the word and the count
function (doc) {
  if (doc.word) {
  	emit(doc.word,doc.count);
  }
}

This will output a row of data for each input document, containing the document ID (actually our word), the word as a key, and the count of incidences of that word in the source text. You can see the raw JSON output in Listing 8.

Listing 8. The raw JSON output
{"total_rows":113,"rows":[
{"id":"acceptance","key":"acceptance","value":2},
{"id":"accompagner","key":"accompagner","value":1},
{"id":"achieve","key":"achieve","value":1},
{"id":"adulteration","key":"adulteration","value":1},
{"id":"arsenic","key":"arsenic","value":2},
{"id":"attainder","key":"attainder","value":1},
{"id":"beerpull","key":"beerpull","value":2},
{"id":"beware","key":"beware","value":5},
{"id":"breeze","key":"breeze","value":2},
{"id":"brighteyed","key":"brighteyed","value":1}
]
}

In the output, id is the document ID, key is the key you specified in the emit statement, and value is the value specified in the emit statement.


Getting live data

Now that we have processed the information in Hadoop, imported it into Couchbase Server, and created a view on that data within Couchbase Server, we can begin to query the information that we have processed and stored. Views are accessed using a REST like API, or if you are using one of the Couchbase Server SDKs, through the corresponding view querying functions.

Querying is possible by three main selections:

  • Individual key. For example, showing the information matching a specific key, such as 'unkind'.
  • List of keys. You can supply an array of key values, and this will return all records where the view key matches one of the supplied values. For example, ['unkind','kind'] would return records matching either word.
  • Range of keys. You can specify a start and end key.

For example, to find the count for a specified word, you use the key argument to the view:

http://192.168.0.71:8092/words/_design/dev_words/_view/byword?connection_timeout=
            60000&limit=10&skip=0&key=%22breeze%22

Couchbase Server naturally outputs the results of a MapReduce in UTF-8 ordered fashion, sorted by the specified key. This means that you can get a range of values by specifying the start value and end value. For example, to get all the words between 'breeze' and 'kind:

http://192.168.0.71:8092/words/_design/dev_words/_view/byword?connection_timeout=
            60000&limit=10&skip=0&startkey=%22breeze%22&endkey=%22kind%22

The querying is simple, but very powerful, especially when you realize that you can combine it with the flexible view system to generate data in the format you want.


Conclusion

Hadoop on its own provides a powerful processing platform, but there is no method for actually extracting useful information from the data that is processed. By connecting Hadoop to another system, you can use it to query and extract information. Since Hadoop uses MapReduce for processing, you can take advantage of the knowledge of MapReduce through the MapReduce system in Couchbase Server to provide your querying platform. Using this method, you process in Hadoop, export from Hadoop into Couchbase Server as a JSON document, and then use MapReduce in Couchbase Server to query the processed information.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Open source
ArticleID=833300
ArticleTitle=Using Hadoop with Couchbase
publish-date=09112012