Create MapReduce queries to process particular types of data

Patterns and examples by scenario

MapReduce processing has created an entire set of new paradigms and structures for processing and building different types of queries. Getting the best out of Hadoop, however, means writing the appropriate MapReduce query to process the information. This article looks at a variety of scenarios with cookbook-style examples of how to develop different types of queries.

Martin C. Brown, Director of Documentation

Martin BrownA professional writer for over 15 years, Martin (MC) Brown is the author and contributor to more than 26 books covering an array of topics, including the recently published Getting Started with CouchDB. His expertise spans myriad development languages and platforms: Perl, Python, Java, JavaScript, Basic, Pascal, Modula-2, C, C++, Rebol, Gawk, Shellscript, Windows, Solaris, Linux, BeOS, Microsoft WP, Mac OS and more. He currently works as the director of documentation for Continuent.



07 January 2014

Also available in Chinese Russian Japanese

Advanced text processing

Processing text is a common use of the MapReduce process because text processing is comparatively complex and processor-intensive. Basic word counts are used as a frequent demo of the abilities of Hadoop to process large volumes of text and achieve a basic summary of the rough content.

To get the word count, the text is split — using a basic string tokenizer — from an input file into individual words with a count, and a Reduce is used to calculate the counts for each word. For example, from the phrase the quick brown fox jumps over the lazy dog, the Map phase produces the output in Listing 1.

Listing 1. Output from Map phase
the, 1
quick, 1
brown, 1
fox, 1
jumps, 1
over, 1
the, 1
lazy, 1
dog, 1

The Reduce phase then sums up the incidences of each unique word, resulting in the output shown in Listing 2.

Listing 2. Output from the Reduce phase
the, 2
quick, 1
brown, 1
fox, 1
jumps, 1
over, 1
lazy, 1
dog, 1

Although this approach is practical for basic word counts, you often want to identify key phrases or the incidence of words. For example, take the reviews on Amazon of different movies and videos.

Using the information from the Stanford University big data project, you can download the movie review data (see Resources). The data includes both the rating and helpfulness of the original review (as reported on Amazon), as shown in Listing 3.

Listing 3. Download the movie review data
product/productId: B003AI2VGA
review/userId: A3QYDL5CDNYN66
review/profileName: abra "a devoted reader"
review/helpfulness: 0/0
review/score: 2.0
review/time: 1229040000
review/summary: Pretty pointless fictionalization
review/text: The murders in Juarez are real. This movie is a badly acted 
fantasy of revenge and holy intercession. If there is a good movie about 
Juarez, I don't know what it is, but it is not this one.

Note that although the reviewer has given the movie a score of 2 out of 5 (1 is the worst, 5 is the best), the review content describes this as a really bad movie. We need a confidence rating that enables us to understand whether the rating given, and the actual review, match each other.

Many tools could be used to perform advanced heuristics, but basic processing can be achieved using a simple index or regular expression. Then we can count the positive and negative regex matches to get a score for the individual movie.

Figure 1. Counting positive and negative regex matches to get a score for a movie
Image shows get movie score from word score from raw data

For the Map part, count the individual words or phrases within the movie review and provide a single count for the positive and negative. The Map action the score for the movie from the product review, and the Reduce action then sums these together by product ID to give a positive or negative rating. The Map, therefore, looks like Listing 4.

Listing 4. Map function to provide a single count for positive and negative
// List of positive words/phrases 
static String[] pwords = {"good","excellent","brilliant movie"};
// List of negative words/phrases
static String[] nwords = {"poor","bad","unwatchable"}; 


int count = 0;
for (String word : pwords) {
 String REGEX = "\\b" + word + "\\b";
 Pattern p = Pattern.compile(REGEX);
 Matcher m = p.matcher(INPUT);
 while(m.find()) {
count++;
 
} 
for (String word : nwords) { 
 String REGEX = "\\b" + word + "\\b"; 
 Pattern p = Pattern.compile(REGEX); 
 Matcher m = p.matcher(INPUT); 
 while(m.find()) { 
count--; 
 } 
}

output.collect(productId, count);

The Reduce can then be calculated like a traditional sum on the contents.

Listing 5. Reduce function to sum the positive and negative reviews by product ID
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

 public void reduce(Text key, 
      Iterable<IntWritable> values, Context context) 
 throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 context.write(key, new IntWritable(sum));
 }
}

The result is a confidence score for the reviews. The word lists can be expanded to include phrases you want to match against.


Read and write JSON data

JSON has become a practical data interchange format. Its utility is partly due to its simple nature and structure and the ease of parsing in so many languages and environments.

When parsing incoming JSON data, the most common format is a single JSON record per notational line of input.

Listing 6. Single JSON record per notational line of input
{ "productId" : "B003AI2VGA", "score": 2.0, "text" : """}
{ "productId" : "B007BI4DAT", "score": 3.4, "text" : """}
{ "productId" : "B006AI2FDH", "score": 4.1, "text" : """}

This code can easily be parsed out by converting the incoming string to a JSON object using a suitable class such as GSON. When using this method with GSON, you'll need to deserialize into a pre-determined class.

Listing 7. Deserialize into a predetermined class
class amazonRank {
 private String productId;
 private float score;
 private String text;
 amazonRank() {
 }
}

Parse the incoming text, as shown below.

Listing 8. Parse the incoming text
 public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
try {

 amazonRank rank = gson.fromJson(value.toString(),amazonRank.class);
...

To write to JSON data, do the reverse. Create the output class you want to match your JSON output within your MapReduce definition, then use the GSON class to convert this into a JSON representation of this structure.

Listing 9. Write JSON data
class recipeRecord {
 private String recipe;
 private String recipetext;
 private int recipeid;
 private float calories;
 private float fat;
 private float weight;
 recipeRecord() {
 }
}

Now you can populate an instance of the object during output and convert it into a single JSON record.

Listing 10. Populate an instance of the object during output
recipeNutrition recipe = new recipeRecord(); 
recipe.recipeid = key.toString();
recipe.calories = sum;

Gson json = new Gson();
output.collect(key, new Text(json.toJson(recipe)));

If you use a third-party library within your Hadoop processing jobs, ensure that you include your library JAR files along with your MapReduce code: $ jar -cvf recipenutrition.jar -C recipenutrition/* google-gson/gson.jar.

Another alternative, even though it's outside of the Hadoop MapReduce processor, is to make use of Jaql, which will parse and process JSON data directly.


Merge datasets

Three types of merging are typically performed within a MapReduce job:

  1. Combine the content of multiple files with the same structure.
  2. Combine the content of multiple files with similar structures you want to combine.
  3. Join data from multiple sources on a specific ID or keyword.

The first option is best and is handled outside of a typical MapReduce job, since it can be done using a Hadoop Distributed File System (HDFS) getmerge operation or something similar. This operation accepts a single directory as content and outputs to a specified file. For example, $ hadoop fs -getmerge srcfiles megafile merges all of the files in the srcfiles directory to one file: megafile.

Merge similar files

To merge similar but not identical files, the primary issue is how to identify the format used on the input and how to specify the format of the output. For example, given the file name, phone, count and a second file of name, email, phone, count, the onus is on you to determine which is the right one and perform the Map that generates the structure you need. With more complicated records, you may need to perform more complicated merging of the fields with and without empty values during the Map phase to generate the information.

In truth, Hadoop is not ideal for this process unless you also use it as an opportunity to simplify, count, or otherwise reduce the information. That is, you identify the number of incoming records, what the likely format is, and perform a Reduce on the fields you want to select.

Joins

Although there are potential solutions for performing joins, they often rely on processing the information in a structured way, then using this structure to determine what to do with the output information.

For example, given two separate threads of information — such as the email address, a count of the sent email messages, and a count of the received email addresses — the goal is to merge the data into an output format. This is the input file: email, sent-count and email, received-count. The output should be in this format: email, sent-count, received-count.

Process the incoming files and output the content differently so that the files and data can be accessed and generated differently. Then rely on the Reduce function to perform the reduction. In most cases, this will be a multiple-stage process:

  • One stage processes the "sent" email messages and outputs the information as email, fake#sent

    Note: We use the fake prefix to alter the ordering so the data can be collated by the fake prefix, not the received prefix. This practice allows the data to be joined by the fake, implied order.

  • One stage processes the "received" email messages and outputs the information as email, received.

When the Map function reads the files, it generates rows.

Listing 11. Generate rows
dev@null.org,0#sent
dev@null.org, received

The Map identifies the input record and outputs a unified version with a key. The sent#received structure is output and generated to process the content and to determine whether the values should be merged together or just summed as a received-only value.

Listing 12. Output a unified version with a key
int sent = 0; 
int received = 0; 
for (Text val : values) { 
    String strVal = val.toString(); 
    buf.append(strVal).append(","); 
    if (strVal.contains("#")) {  
        String[] tokens = strVal.split("#"); 
// If the content contains a hash, assume it's sent and received
        int recvthis = Integer.parseInt(tokens[0]); 
        int sentthis = Integer.parseInt(tokens[1]); 
        received = received + Integer.parseInt(recvthis);
        sent = sent _ sentthis; 
    } else { 
// Otherwise, it's just the received value
        received = received + Integer.parseInt(strVal); 
    } 
} 
context.write(key, IntWritable(sendReplyCount), new IntWritable(receiveReplyCount));

In this case, we rely on the reduction within Hadoop itself to simplify the output data by the key (in this case, the email address) into the information we need. Since the information is keyed on the email, the records can easily be merged using the email as the key.


Tricks with keys

Keep in mind that a few aspects of the MapReduce process can be used to our advantage. In essence, MapReduce is a two-stage process:

  1. The Map stage accesses the data, picks the information you need, and outputs the information, using a key and the associated information.
  2. The Reduce stage simplifies the data by merging, summarizing, or counting the mapped data into a simpler form, using the common key.

The key is an important concept because it can be used to format and summarize data in different ways. For example, if you are planning to reduce data about countries and city populations, you output only one key to reduce or summarize the data by country.

Listing 13. Output only one key
France
United Kingdom
USA

To summarize by country and city, the key is the compound version of the two.

Listing 14. The key is the compound version of the country and city
France#Paris
France#Lyon
France#Grenoble
United Kingdom#Birmingham
United Kingdom#London

This is a fundamental trick we can use to our advantage when processing certain types of data (for example, material for which there is a common key) because we can use it to simulate fake joins. This trick is also useful when combining blog posts (that have a blogpostid for identification) and blog comments (that have both a blogpostid and blogcommentid).

To reduce the output — for example to count words in both the post and the comments — we first process, through a Map, the blog posts and the blog comments, but we output a common ID.

Listing 15. Reduce the output
blogpostid,the,quick,brown,fox
blogpostid#blogcommentid,jumps,over,the,lazy,dog

This outputs the information distinctly as two separate rows of information by using both keys. We can also reverse this relationship. We can identify the words against the blogpostid from comments by adding the comment ID to each word.

Listing 16. Reverse the relationship
blogpostid,the,quick,brown,fox,jumps#blogcommentid,over#blogcommentid,
the#blogcommentid,lazy#blogcommentid,dog#blogcommentid

During processing, we know whether the word was attached to the blog post by looking at the ID and if it was attached to the post or the comment by the format.


Simulating traditional database operations

Hadoop isn't really a database in the true sense, partially because we cannot perform updates, deletes, or inserts on a row-by-row basis. Although this isn't an issue in many cases (you can do a dump and load of active data for processing), there are times when you do not want to be exporting and reloading data.

One trick that enables you to avoid the exporting and reloading of data is to create a change file that contains a list of differences from the original dump. For now, ignore how you would produce such data from an SQL or other database. As long as the data has a unique ID we can use to key on, we can make use of the key. Imagine a source file similar to the one shown in Listing 17.

Listing 17. Source file
1,London
2,Paris,
3,New York

Assume a change file like the one shown in Listing 18.

Listing 18. Change file
1,DELETE
2,UPDATE,Munich
4,INSERT,Tokyo

The result is the resolved merge of the two as shown in Listing 19.

Listing 19. Merge of the source file and the change file
2,Munich
3,New York
4,Tokyo

How do we achieve such a merge through Hadoop?

One way to achieve this merge with Hadoop is to process the current data and convert it into inserts (since they are all inserts of new data into the target file), then convert UDPATE operations into a DELETE and INSERT of the new data. In fact, it's easier to do that with the change file by modifying it to look like Listing 20.

Listing 20. Achieve the merge through Hadoop
1,DELETE
2,DELETE
2,INSERT,Munich
4,INSERT,Tokyo

The problem is that we can't physically merge the two, but we can process them accordingly. If it's an original INSERT or DELETE, we output the key with a counter. If it is an insert from an UPDATE we know is creating the new data, we want a different key that will not get reduced, so we produce an interstitial file like Listing 21.

Listing 21. Produce the interstitial file
1,1,London
2,1,Paris,
3,1,New York 
1,-1,London
2,-1,Paris
2#NEW,Munich
4#NEW,1,Tokyo

During Reduce, we sum the contents of the counter for each unique key, producing Listing 22.

Listing 22. Sum the contents of the counter for each unique key
1,0,London
2,0,Paris,
3,1,New York 
2#NEW,1,Munich
4#NEW,1,Tokyo

We can then run this through a secondary MapReduce function, with the basic structure as follows in Listing 23.

Listing 23. Running the contents through a secondary MapReduce function
map: 
    if (key contains #NEW):
        emit(row)
    if (count >0 ):
        emit(row)

The secondary MapReduce results in the desired output are shown in Listing 24.

Listing 24. Desired output from secondary MapReduce function
3,1,New York 
2,Munich
4,1,Tokyo

Figure 2 demonstrates this two-phase process of first formatting, reducing, then simplifying the output in the two-phase process.

Figure 2. Two-phase process of formatting, reducing, and simplifying the output
Raw data reduced and mapped in map and Reduce phases

This process requires more work than within a traditional database, but it offers a solution that requires a much simpler exchange of data that is constantly updated.


Summary

This article examines many different scenarios for making use of MapReduce queries. You have seen the power that these queries can have with all sorts of data, and you should now be able to take advantage of these examples in your own MapReduce solutions.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics
ArticleID=958097
ArticleTitle=Create MapReduce queries to process particular types of data
publish-date=01072014