Processing and content analysis of various document types using MapReduce and InfoSphere BigInsights

Businesses often need to analyze large numbers of documents of various file types. Apache Tika is a free open source library that extracts text contents from a variety of document formats, such as Microsoft® Word, RTF, and PDF. Learn how to run Tika in a MapReduce job within InfoSphere® BigInsights™ to analyze a large set of binary documents in parallel. Explore how to optimize MapReduce for the analysis of a large number of smaller files. Learn to create a Jaql module that makes MapReduce technology available to non-Java programmers to run scalable MapReduce jobs to process, analyze, and convert data within Hadoop.

Share:

Sajad Izadi, Partner Enablement Engineer, IBM

Sajad IzadiSajad Izadi a student at York University in Toronto focusing on information technology. He is completing an internship at IBM's Toronto software development lab as a member of the Toronto's Information Management Business Partner team. His main responsibilities include technical verification of ReadyFor DB2 applications for business partners and aiding the big data team in partner enablement activities by developing demos used in POCs. His interests include databases, data warehousing, and application development. He is a certified IBM DB2 10.1 Administrator and a CCNA.



Benjamin G. Leonhardi, Software Engineer, IBM

作者照片:Benjamin LeonhardiBenjamin Leonhardi is the team lead for the big data/warehousing partner enablement team. Before that, he was a software developer for InfoSphere Warehouse at the IBM R&D Lab Boeblingen in Germany. He was a developer in the data mining, text mining, and mining reporting solutions.



Piotr Pruski, Partner Enablement Engineer, IBM

Pruski, PiotrPiotr Pruski is a partner enablement engineer within the Information Management Business Partner Ecosystem team in IBM. His main focus is to help accelerate sales and partner success by reaching out to and engaging business partners, enabling them to work with products within the IM portfolio, namely InfoSphere BigInsights and InfoSphere Streams.



29 July 2014

This article describes how to analyze large numbers of documents of various types with IBM InfoSphere BigInsights. For industries that receive data in different formats (for example, legal documents, emails, and scientific articles) InfoSphere BigInsights can provide sophisticated text analytical capabilities that can aid in sentiment prediction, fraud detection, and other advanced data analysis.

Learn how to integrate Apache Tika, an open source library that can extract the text contents of documents, with InfoSphere BigInsights, which is built on the Hadoop platform and can scale to thousands of nodes to analyze billions of documents. Typically, Hadoop works on large files, so this article explains how to efficiently run jobs on a large number of small documents. Use the steps here to create a module in Jaql that creates the integration. Jaql is a flexible language for working with data in Hadoop. Essentially, Jaql is a layer on top of MapReduce that enables easy analysis and manipulation of data in Hadoop. Combining a Jaql module with Tika makes it easy to read various documents and use the analytical capabilities of InfoSphere BigInsights, such as text analytics and data mining, in a single step, without requiring deep programming expertise.

This article assumes a basic understanding of the Java™ programming language, Hadoop, MapReduce, and Jaql. Details about these technologies are outside the scope of the article, which focuses instead on sections of code that must be updated to accommodate custom code. Download the sample data used in this article.

Overview: InfoSphere BigInsights, Tika, Jaql, and MapReduce classes

InfoSphere BigInsights is built on Apache Hadoop and enhances it with enterprise features, analytical capabilities, and management features. Apache Hadoop is an open source project that uses clusters of commodity servers to enable processing on large data volumes. It can scale from one to thousands of nodes with fault-tolerance capabilities. Hadoop can be thought of as an umbrella term. It includes two main components:

  • A distributed file system (HDFS) to store the data
  • The MapReduce framework to process data

MapReduce is a programming paradigm that enables parallel processing and massive scalability across the Hadoop cluster. Data in Hadoop is first broken into smaller pieces, such as blocks, and distributed on the cluster. MapReduce can then analyze these blocks in a parallel fashion.

Apache Tika

The Apache Tika toolkit is a free open source project used to read and extract text and other metadata from various types of digital documents, such as Word documents, PDF files, or files in rich text format. To see a basic example of how the API works, create an instance of the Tika class and open a stream by using the instance.

Listing 1. Example of Tika
import org.apache.tika.Tika;
...
private String read() 
{
	Tika tika = new Tika();
	FileInputStream stream = new FileInputStream("/path_to_input_file.PDF");
	String output = tika.parseToString(stream);
	return output;
}

If your document format is not supported by Tika (Outlook PST files are not supported, for example) you can substitute a different Java library in the previous code listing. Tika does support the ability to extract metadata, but that is outside the scope of this article. It is relatively simple to add that function to the code.

Jaql

Jaql is primarily a query language for JSON, but it supports more than just JSON. It enables you to process structured and non-traditional data. Using Jaql, you can select, join, group, and filter data stored in HDFS in a manner similar to a blend of Pig and Hive. The Jaql query language was inspired by many programming and query languages, including Lisp, SQL, XQuery, and Pig. Jaql is a functional, declarative query language designed to process large data sets. For parallelism, Jaql rewrites high-level queries, when appropriate, into low-level queries consisting of Java MapReduce jobs. This article demonstrates how to create a Jaql I/O adapter over Apache Tika to read various document formats, and to analyze and transform them all within this one language.

MapReduce classes used to analyze small files

Typically, MapReduce works on large files stored on HDFS. When writing to HDFS, files are broken into smaller pieces (blocks) according to the configuration of your Hadoop cluster. These blocks reside on this distributed file system. But what if you need to efficiently process a large number of small files (specifically, binary files such as PDF or RTF files) using Hadoop?

Several options are available. In many cases, you can merge the small files into a big file by creating a sequence file, which is the native storage format for Hadoop. However, creating sequence files in a single thread can be a bottleneck and you risk losing the original files. This article offers a different way to manipulate a few Java classes used in MapReduce. Traditional classes require each individual file to have a dedicated mapper. But this process is inefficient when there are many small files.

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition is a complimentary, downloadable version of InfoSphere BigInsights, IBM's Hadoop-based offering. Using Quick Start Edition, you can try out the features that IBM has built to extend the value of open source Hadoop, like Big SQL, text analytics, and BigSheets. Guided learning is available to make your experience as smooth as possible including step-by-step, self-paced tutorials and videos to help you start putting Hadoop to work for you. With no time or data limit, you can experiment on your own time with large amounts of data. Watch the videos, follow the tutorials (PDF), and download BigInsights Quick Start Edition now.

As an alternative to traditional classes, process small files in Hadoop by creating a set of custom classes to notify the task that the files are small enough to be treated in a different way from the traditional approach.

At the mapping stage, logical containers called splits are defined, and a map processing task takes place at each split. Use custom classes to define a fixed-sized split, which is filled with as many small files as it can accommodate. When the split is full, the job creates a new split and fills that one as well, until it's full. Then each split is assigned to one mapper.

MapReduce classes for reading files

Three main MapReduce Java classes are used to define splits and read data during a MapReduce job: InputSplit, InputFormat, and RecordReader.

When you transfer a file from a local file system to HDFS, it is converted to blocks of 128 MB. (This default value can be changed in InfoSphere BigInsights.) Consider a file big enough to consume 10 blocks. When you read that file from HDFS as an input for a MapReduce job, the same blocks are usually mapped, one by one, to splits. In this case, the file is divided into 10 splits (which implies means 10 map tasks) for processing. By default, the block size and the split size are equal, but the sizes are dependent on the configuration settings for the InputSplit class.

From a Java programming perspective, the class that holds the responsibility of this conversion is called an InputFormat, which is the main entry point into reading data from HDFS. From the blocks of the files, it creates a list of InputSplits. For each split, one mapper is created. Then each InputSplit is divided into records by using the RecordReader class. Each record represents a key-value pair.

FileInputFormat vs. CombineFileInputFormat

Before a MapReduce job is run, you can specify the InputFormat class to be used. The implementaion of FileInputFormat requires you to create an instance of the RecordReader, and as mentioned previously, the RecordReader creates the key-value pairs for the mappers.

FileInputFormat is an abstract class that is the basis for a majority of the implementations of InputFormat. It contains the location of the input files and an implementation of how splits must be produced from these files. How the splits are converted into key-value pairs is defined in the subclasses. Some example of its subclasses are TextInputFormat, KeyValueTextInputFormat, and CombineFileInputFormat.

Hadoop works more efficiently with large files (files that occupy more than 1 block). FileInputFormat converts each large file into splits, and each split is created in a way that contains part of a single file. As mentioned, one mapper is generated for each split. Figure 1 depicts how a file is treated using FileInputFormat and RecordReader in the mapping stage.

Figure 1. FileInputFormat with a large file
Image shows how a file is treated using FileInputFormat and RecordReader in the mapping stage

However, when the input files are smaller than the default block size, many splits (and therefore, many mappers) are created. This arrangement makes the job inefficient. Figure 2 shows how too many mappers are created when FileInputFormat is used for many small files.

Figure 2. FileInputFormat with many small files
Image shows blocks divided into many splits, mappers

To avoid this situation, CombineFileInputFormat is introduced. This InputFormat works well with small files, because it packs many of them into one split so there are fewer mappers, and each mapper has more data to process. Unlike other subclasses of FileInputFormat, CombineFileInputFormat is an abstract class that requires additional changes before it can be used. In addition to these changes, you must ensure that you prevent splitting the input. Figure 3 shows how CombineFileInputFormat treats the small files so that fewer mappers are created.

Figure 3. CombineFileInputFormat with many small files
Image shows 1:1 correlation of splits to mappers, fewer splits

MapReduce classes used for writing files

You need to save the text content of the documents in files that are easy to process in Hadoop. You can use sequence files, but in this example, you create delimited text files that contain the contents of each file in one record. This method makes the content easy to read and easy to use in downstream MapReduce jobs. The Java classes used for writing files in MapReduce are OutputFormat and RecordWriter. These classes are similar to InputFormat and RecordReader, except that they are used for output. The FileOutputFormat implements OutputFormat. It contains the path of the output files and directory and includes instructions for how the write job must be run.

RecordWriter, which is created within the OutputFormat class, defines the way each record passed from the mappers is to be written in the output path.


Implementing custom MapReduce classes

In the lab scenario used in this article, you want to process and archive a large number of small binary files in Hadoop. For example, you might need to have Hadoop analyze several research papers in PDF format. Using the traditional MapReduce techniques, it will take a relatively long time for the job to complete, only because you have too many small files as your input. Moreover, the PDF format of your files isn't natively readable by MapReduce. In addition to these limitations, storing many small files in the Hadoop distributed file system can consume a significant amount of memory on the NameNode. Roughly 1 GB for every million files or blocks is required. Therefore, files smaller than a block are inefficiently processed with traditional MapReduce techniques. It's more efficient to develop a program that has the following characteristics:

  • Is optimized to work with large number of small files
  • Can read binary files
  • Generates fewer, larger files as the output

A better approach is to use Apache Tika to read the text within any kind of supported document format to develop a TikaInputFormat class to read and process small files by using a MapReduce task, and to use TikaOutputFormat to show the result. Use InputFormat, RecordReader, and RecordWriter to create the solution. The goal is to read many small PDF files and generate output that has a delimited format that looks similar to the code below.

Listing 2. Desired output
<file1.pdf>|<content of file1>
<file2.pdf>|<content of file2>
<file3.pdf>|<content of file3>
...

This output can be used later for downstream analysis. The following sections explain the details of each class.

TikaHelper to convert binary data to text

The purpose of this helper class is to convert a stream of binary data to text format. It receives a Java I/O stream as an input and returns the string equivalent of that stream.

If you are familiar with MapReduce, you know that all tasks contain some configuration parameters set at runtime. With these parameters, you can define how the job is supposed to be run — the location where the output is to reside, for example. You can also add parameters that the classes are to use.

In this application, assume you want to output a delimited file. Therefore, you need a way to replace the chosen delimiter character in the original text field with a different character and a way to replace new lines in the text with the same replacement character. For this purpose, add two parameters: com.ibm.imte.tika.delimiter and com.ibm.imte.tika.replaceCharacterWith. As shown in Listing 3, in the TikaHelper class, read those parameters from an instance of Configuration to get the replacement options. Configuration is passed from RecordReader, which creates the TikaHelper instance, described in a following section of this article.

Listing 3. TikaHelper.java constructor
public TikaHelper(Configuration conf)
{ 
	tika = new Tika();
	String confDelimiter = conf.get("com.ibm.imte.tika.delimiter");
	String confReplaceChar =
		conf.get("com.ibm.imte.tika.replaceCharacterWith");
	if (confDelimiter != null ) 
		this.delimiter = "["+ confDelimiter + "]";
	if (confReplaceChar != null ) 
		this.replaceWith = confReplaceChar;
	logger.info("Delimiter: " + delimiter);
	logger.info("Replace With character:" + replaceWith);
}

After preparing the options, call the readPath method to get a stream of data to be converted to text. After replacing all the desired characters from the configuration, return the string representation of the file contents.

The replaceAll method is called on a string object and replaces all recurring characters with the one specified in the argument. Because it takes a regular expression as input, surround the characters with the regular expression group characters [ and ]. In the solution, indicate that if the com.ibm.imte.tika.replaceCharacterWith is not specified, all characters are to be replaced with an empty string.

In this article, the output is saved as delimited files. This makes them easy to read and process. However, you do need to remove newline and delimiter characters in the original text. In use cases such as sentiment analysis or fraud detection, these characters are not important. If you need to preserve the original text 100 percent, you can output the results as binary Hadoop sequence files instead.

Listing 4. TikaHelper constructor
public  String readPath(InputStream stream)
{
	try
	{
		String content = tika.parseToString(stream);
		content = content.replaceAll(delimiter, replaceWith);
		content = content.replaceAll(endLine, replaceWith);	
		return content;
	}
	catch (Exception e)
	{
		logger.error("Malformed PDF for Tika: " + e.getMessage());
	}
	return "Malformed PDF";
}

TikaInputFormat to define the job

Every MapReduce task must have an InputFormat. TikaInputFormat is the InputFormat developed in this solution. It is extended from the CombineFileInputFormat class with input parameters for key and value as Text. Text is a writable, which is Hadoop's serialization format to be used for key-value pairs.

TikaInputFormat is used to validate the configuration of the job, split the input blocks, and create a proper RecordReader. As shown in Listing 5 in the createRecordReader method, you can return an instance of RecordReader. As described, you don't need to split the files in TikaInputFormat because the files are assumed to be small. Regardless, TikaHelper cannot read parts of a file. Therefore, the return value for the isSplitable method must be set to false.

Listing 5. TikaInputFormat.java
public class TikaInputFormat extends CombineFileInputFormat<Text, Text>
{
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException 
	{
		return new TikaRecordReader((CombineFileSplit) split, context);
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path file) 
	{
		return false;
	}
}

TikaRecordReader to generate key-value pairs

TikaRecordReader uses the data given to the TikaInputFormat to generate key-value pairs. This class is derived from the abstract RecordReader class. This section describes the constructor and the nextKeyValue methods.

In the constructor shown in Listing 6, store the required information to carry out the job delivered from TikaInputFormat. Path[] paths stores the path of each file, FileSystem fs represents a file system in Hadoop, and CombineFileSplit split contains the criteria of the splits. Notice that you also create an instance of TikaHelper with the Configuration to parse the files in the TikaRecordReader class.

Listing 6. TikaRecordReader.java constructor
public TikaRecordReader(CombineFileSplit split, TaskAttemptContext context)
			throws IOException
{
	this.paths = split.getPaths();
	this.fs = FileSystem.get(context.getConfiguration());
	this.split = split;
	this.tikaHelper = new TikaHelper(context.getConfiguration());
}

In the nextKeyValue method shown in Listing 7, you go through each file in the Path[] and return a key and value of type Text, which contains the file path and the content of each file, respectively. To do this, first determine whether you are already at the end of the files array. If not, you move on to the next available file in the array. Then you open a FSDataInputStream stream to the file. In this case, the key is the path of the file and the value is the text content. You pass the stream to the TikaHelper to read the contents for the value. (The currentStream field that always points to the current file in the iteration.) Next, close the used-up stream.

Explore HadoopDev

Find resources you need to get started with Hadoop powered by InfoSphere BigInsights, brought to you by the extended BigInsights development team. Doc, product downloads, labs, code examples, help, events, expert blogs — it's all there. Plus a direct line to the developers. Engage with the team now.

This method is run once for every file in the input. Each file generates a key-value pair. As explained, when the split has been read, the next split is opened to get the records, and so on. This process also happens in parallel on other splits. In the end, by returning the value false, you stop the loop.

In addition to the following code, you must also override some default functions, as shown in the full code, available for download.

Listing 7. TikaInputFormat.java nextKeyValue
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
	if (count >= split.getNumPaths())
	{
		done = true;
		return false; 
		//we have no more data to parse
	}
	
	Path path = null;
	key = new Text();
	value = new Text();
		
		
	try {
		path = this.paths[count];
	} catch (Exception e) {
		return false;
	}
		
	currentStream = null;
	currentStream = fs.open(path);

	key.set(path.getName());
	value.set(tikaHelper.readPath(currentStream));

	currentStream.close();
	count++;

	return true; //we have more data to parse
}

TikaOutputFormat to specify output details

This class determines where and how the output of the job is stored. It must be extended from an OutputFormat class. In this case, it is extended from FileOutputFormat. As shown in Listing 8, you first allocate the path for the output, then create an instance of TikaRecordWriter to generate the output files. Just as the TikaInputFormat, this class must be specified in the main method to be used as the OutputFormat class.

Listing 8. TikaOutputFormat.java
public class TikaOutputFormat extends FileOutputFormat<Text, Text>
{

	@Override
	public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
			throws IOException, InterruptedException
	{
		//to get output files in part-r-00000 format
		Path path = getDefaultWorkFile(context, ""); 
		FileSystem fs = path.getFileSystem(context.getConfiguration());
		FSDataOutputStream output = fs.create(path, context);
		return new TikaRecordWriter(output, context);
	}

}

TikaRecordWriter to create the output

This class is used create the output. It must be extended from the abstract RecordWriter.

In the constructor shown in Listing 9, you get the output stream, the context, and the custom configuration parameter, which serves as the delimiter between the file name and its content. This parameter can be set in the runtime (main method). If it is not specified, | is picked by default.

Listing 9. TikaRecordWriter.java constructor
public TikaRecordWriter(DataOutputStream output, TaskAttemptContext context)
{
	this.out = output;
	String cDel = context.getConfiguration().get("com.ibm.imte.tika.delimiter");
	if (cDel != null)
		delimiter = cDel;
	logger.info("Delimiter character: " + delimiter);
}

In the write method shown in Listing 10, use the key and value of type Text created in the mapper to be written in the output stream. The key contains the file name, and the value contains the text content of the file. When writing these two in the output, separate them with the delimiter and then separate each row with a new line character.

Listing 10. TikaRecordWriter.java write
@Override
public void write(Text key, Text value) throws IOException,
		InterruptedException
{
	out.writeBytes(key.toString());
	out.writeBytes(delimiter);
	out.writeBytes(value.toString());
	out.writeBytes("\n");
}

TikaDriver to use the application

To run a MapReduce job, you need to define a driver class, TikaDriver, which contains the main method, as shown in Listing 11. You can set the TikaInputFormat as the custom InputFormat, and similarly, you can set the TikaOutputFormat as the custom OutputFormat for the job.

Listing 11. Main method
public static void main(String[] args) throws Exception
{
	int exit = ToolRunner.run(new Configuration(), new TikaDriver(), args);
	System.exit(exit);
}

@Override
public int run(String[] args) throws Exception
{
	Configuration conf = new Configuration();
	//setting the input split size 64MB or 128MB are good.
	conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 67108864);
	Job job = new Job(conf, "TikaMapreduce");
	conf.setStrings("com.ibm.imte.tika.delimiter", "|");
	conf.setStrings("com.ibm.imte.tika.replaceCharacterWith", "");
	job.setJarByClass(getClass());
	job.setJobName("TikaRead");
	
	job.setInputFormatClass(TikaInputFormat.class);
	job.setOutputFormatClass(TikaOutputFormat.class);
	
	FileInputFormat.addInputPath(job, new Path(args[0]));
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
		
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	return job.waitForCompletion(true) ? 0 : 1;
}

Tika and Log4j API attachment

Remember to attach the Tika and Log4j API upon running the task. To do this in Eclipse, go to the job configuration by clicking Run > Run Configurations and in the Java MapReduce section, click the JAR Settings tab and find the APIs by adding them to the Additional JAR Files section.

Pay attention to the first line in bold. If the max split size is not defined, the task attributes all of the input files to only one split, so there is only one map task. To prevent this, define the max split size. This value can be changed by defining a value for the mapreduce.input.fileinputformat.split.maxsize configuration parameter. This way, each split has a configurable size —64MB in this case.

You have now finished the MapReduce job. It reads all files in the HDFS input folder and transcodes them into a delimited output file. You can then conveniently continue analyzing the data with text analytical tools, such as IBM Annotation Query Language (AQL). If you want a different output format or you want to directly transform the data, you must modify the code appropriately. Because many people are not comfortable programming Java code, this article explains how to use the same technology in a Jaql module.


Using a Jaql module rather than Java classes

This section describes how to create a Jaql module using the same technology as in the previous section and how to use this module to transform documents, load them from external file systems, and directly analyze them. A Jaql module enables you to do all of this processing, without writing any Java code, using a straightforward syntax.

The InputFormat, OutputFormat, RecordReader, and RecordWriter classes described previously, reside in the org.apache.hadoop.mapreduce and org.apache.hadoop.mapreduce.lib.output packages, which are known as the new Hadoop APIs.

To use the same approach with Jaql, you need to implement classes in the org.apache.hadoop.mapred package, which is an older version of the MapReduce APIs.

First, learn how to apply the same methods to the older package.

TikaJaqlInputFormat to validate input

This class is used to validate the input configuration for the job, split the input blocks, and create the RecordReader. It is extended from org.apache.hadoop.mapred.MultiFileInputFormat class and it contains two methods.

As shown in Listing 12, the constructor creates an instance of TikaJaqlRecordReader and the isSplitable method is set to return false to override the default behavior for stopping the InputFormatfrom splitting the files. To be able to manipulate the input after loading in Jaql, use the generic type JsonHolder.

Listing 12. TikaJaqlInputFormat.java
public class TikaJaqlInputFormat extends MultiFileInputFormat<JsonHolder, JsonHolder>
{

	@Override
	public RecordReader<JsonHolder, JsonHolder> getRecordReader(
			InputSplit split, JobConf job, Reporter reporter)
			throws IOException
	{
		return new TikaJaqlRecordReader(job, (MultiFileSplit) split);
	}
	@Override
	protected boolean isSplitable(FileSystem fs, Path filename)
	{
		return false;
	}
}

TikaJaqlRecordReader to generate key-value pairs

This class is used to generate the key-value pairs used in MapReduce. It is derived from the org.apache.hadoop.mapred.RecordReader class to maintain compatibility with Jaql. This section describes the constructor and the next methods.

In the constructor as shown in Listing 13, initialize the needed class variables. Get the split, which contains information about the files, and create a new instance of TikaHelper to read the binary files.

Listing 13. TikaJaqlRecordReader constructor
public TikaJaqlRecordReader(Configuration conf, MultiFileSplit split)
		throws IOException
{
	this.split = split;
	this.conf = conf;
	this.paths = split.getPaths();
	this.tikaHelper = new TikaHelper(conf);
}

What about OutputFormat and RecordWriter?

You don't need to implement the output part of the task because after loading the data with Jaql, you can use existing pre-defined Jaql modules to manipulate the data and write it out in various formats.

In the next method, as shown in Listing 14, iterate through all the files in the split, one after the other. After opening a stream to each file, assign the name and the contents as the elements to a new instance of BufferedJsonRecord. BufferedJsonRecord helps you keep items in an appropriate format. Jaql internally runs on JSON documents, so all data needs to be translated into valid JSON objects by the I/O adapters. The BufferedJsonRecord is then assigned as the value of the record. The key, however, remains empty.

Listing 14. TikaJaqlRecordReader next method
public boolean next(JsonHolder key, JsonHolder value) throws IOException
{
	if (count >= split.getNumPaths())
	{
		done = true;
		return false;
	}
	
	Path file = paths[count];
	fs = file.getFileSystem(conf);
	InputStream stream = fs.open(file);
	

	BufferedJsonRecord bjr = new BufferedJsonRecord();
	
	bjr.setNotSorted();
	bjr.add(new JsonString("path"), new JsonString(file.getName()));

	bjr.add(new JsonString("content"),
		new JsonString(this.tikaHelper.readPath(stream)));
		
		
	value.setValue(bjr);
		
	stream.close();
		
	count++;

	return true;
}

Creating the Jaql module

Jaql modules enable users to create packages of reusable Jaql functions and resources. Create a tika module that contains an I/O adapter. I/O adapters are passed to I/O functions and allow Jaql to read or write from various source types, such as delimited files, sequence files, AVRO files, HBase and Hive tables, and much more. This tika module enables users to read binary files supported by Apache Tika (such as Word files or PDF documents) to extract the file name and the text content. To create the tika module, export the TikaJaql classes developed previously as a JAR file. Jaql can dynamically load Java resources and add them to the class path by using the function addRelativeClassPath() to register such additional libraries.

Creating and referencing modules is straightforward in Jaql. Every Jaql script can be added as a module by adding it to the search path of Jaql. The easiest way to do this is by creating a new folder in the $JAQL_HOME/modules directory and including your files there. In this case, the module is named tika, so you need to create a folder $JAQL_HOME/modules/tika. You can then create functions within Jaql scripts and include them in this folder.

Create a custom function named tikaRead() that uses com.ibm.imte.tika.jaql.TikaJaqlInputFormat for the input format component. This function is to be used for reading, so change only the inoptions (and not the outoptions). Based on the implemented classes developed in the previous section, calling the tikaRead() function as an input for read produces one record for every input file with two fields: path, which is the full file name, and content, which is the text content of the file. Calling the tikaRead() function is similar to calling any other Jaql input I/O adapter, such as lines() or del(). Usage examples are included in a subsequent section.

Create the file tika.jaql, as shown in Listing 15 and put it in the $JAQL_HOME/modules/tika directory so it can be easily imported into other Jaql scripts. The name of the Jaql file is not relevant, but the name of the folder you created under the modules folder is important. You can also add modules dynamically using command-line options from a Jaql-supported terminal.

This code looks for the generated JAR files in /home/biadmin/. You need to copy the Tika JAR file in this folder and export your created class files as TikaJaql.jar to this folder, as well. In Eclipse, you can create a JAR file from a project with the Export command.

Listing 15. tika.jaql
addRelativeClassPath(getSystemSearchPath(), '/home/biadmin/tika-app-1.5.jar,/hom
e/biadmin/TikaJaql.jar');

//creating the function
tikaRead = fn  (
		 location 	: string,
		 inoptions	: {*}? = null,
		 outoptions	: {*}? = null
		)
{
  location,
  "inoptions": {
   	"adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter",
   	"format": "com.ibm.imte.tika.jaql.TikaJaqlInputFormat",
   	"configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator"
  }
};

Using Jaql

Now that the module has been created, use the following examples to help you see some possible uses of this function.

Jaql is quite flexible and can be used to transform and analyze data. It has connectors to analytical tools, such as data mining and text analytics (AQL). It has connectors to various file formats (such as line, sequence, and Avro) and to external sources (such as Hive and HBase). You can also use it to read files from the local file system or even directly from the web.

The following section demonstrates three examples for the use of the tika module in Jaql. The first example shows a basic transformation of binary documents on HDFS into a delimited file containing their text content. This example illustrates the fundamental capabilities of the module; it is equivalent to the tasks you carried out with the MapReduce job in the previous sections. The second example shows how to use Jaql to load and transform binary documents directly from an external file system source into HDFS. This example can prove to be a useful procedure if you do not want to store the binary documents in HDFS, but rather to store only the contents in a text or sequence file format, for instance. The load is be single threaded in this case, so it does not have the same throughput as the first approach. The third example shows how to do text analysis directly within Jaql after reading the files, without first having to extract and persist the text contents.

Using the code in Listing 16, read files inside a directory from HDFS and write the results back into HDFS. This method closely mirrors what you have done in the MapReduce job in the first section. You must import the tika module you created to be able to use the tikaRead() functionality. You then read the files in the specified folder using the read() function, and write the file names and text contents to a file in HDFS in delimited file format.

You can find additional information on Jaql in the InfoSphere BigInsights Knowledge Center.

The demo input is a set of customer reviews in Word format in a folder, as shown in Listing 16. Of the 10 reviews, some are positive and some are negative. Assume you want to extract the text and store it in delimited format. Later, you might want to perform text analytics on it. You want to keep the file name because it tells you who created the review. Normally, that relationship is documented in a separate table.

Listing 16. The input files in hdfs:/tmp/reviews/
review1.doc
review2.doc
review3.doc
...

As shown in Listing 17, run the Jaql command to read all the supported documents of this folder, extract the text, and save it into a single delimited file that has one line per original document.

Listing 17. HDFS to HDF using Jaql
import tika(*);

read(tikaRead("/tmp/reviews")) //You could put data transformations here
	-> write(del("/tmp/output", 
			{schema:schema{path,content}, delimiter:"|", quoted:true}));

You can now find the output in the /tmp/output folder. This folder contains the text content of the Word documents originally in /tmp/reviews in the format shown below.

Listing 18. Output of Jaql Tika transformation
::::::::::::::
part-00000
::::::::::::::
"review1.doc"|"I do not care for the camera.  "
"review10.doc"|"It was very reliable "
"review2.doc"|"The product was simply bad. "
"review3.doc"|"The user interface was simply atrocious. "
"review4.doc"|"The product interface is simply broken. "
...
::::::::::::::
part-00001
::::::::::::::
"review5.doc"|"The Windows client is simply crappy. "
"review6.doc"|"I liked the camera. It is a good product. "
"review7.doc"|"It is a phenomenal camera. "
"review8.doc"|"Just an awesome product. "
"review9.doc"|"I really liked the Camera. It is excellent. "
...

You can now easily analyze the document contents with other tools like Hive, Pig, MapReduce, or Jaql. You have one part file for each map task.

Using Jaql, you are not constrained by reading files exclusively from HDFS. By replacing the input path to one that points to a local disk (of the Jaql instance), you can read files from the local file system and use the write() method to copy them into HDFS, as shown in Listing 19. This approach makes it possible to load documents into InfoSphere BigInsights and transform them in a single step. The transformation is not done in parallel (because the data was not read in parallel to begin with), but if the data volumes are not so high, this method can be convenient.

If your operation is CPU-constrained, you can also use a normal read operation that runs in MapReduce. However, this method requires you to put the files on a network file system and mount it on all data nodes. The localRead command in runs the transformation in a local task.

Listing 19. Loading data into HDFS using Jaql
import tika(*);
localRead(tikaRead("file:///home/biadmin/Tika/CameraReviews"))
	-> write(seq("/tmp/output"));

As you can see, the only difference here is the local file path. Jaql is flexible and can dynamically change from running in MapReduce to local mode. You can continue to perform all of the data transformations and analytics in one step. However, Jaql does not run these tasks in parallel because the local file system is not parallel. Note that in the previous example, the output format is changed to a Jaql sequence file. This approach is binary and it is faster, so you don't need to replace characters in the original text. However the disadvantage is that the output files aren't human readable anymore. This format is great for efficient, temporary storage of intermediate files.

This last example in Listing 20 shows how to run a sentiment detection algorithm on a set of binary input documents. (The steps on how to create the AQL text analytics code for this are omitted because there are other comprehensive articles and references existing that go into more detail. In particular, see the developerWorks article "Integrate PureData System for Analytics and InfoSphere BigInsights for email analysis" and the InfoSphere BigInsights Knowledge Center.

Listing 20. Text analysis using Jaql
import tika(*);
import systemT;
read(tikaRead("/tmp/reviews"))
	-> transform { label: $.path, text: $.content }
	-> transform { label: $.label,  sentiments: 
	systemT::annotateDocument( $, ["EmotiveTone"], 
			["file:///home/biadmin/Tika/"], 
			tokenizer="multilingual", 
			outputViews=["EmotiveTone.AllClues"])};

In a nutshell, the commands in the previous sections can read the binary input documents, extract the text content from them, and apply a simple emotive tone detection annotator using AQL. The resulting output is similar to Listing 21.

Listing 21. Jaql output
[
   {
      "label": "review1.doc",
      "sentiments": {
         "EmotiveTone.AllClues": [
            {
               "clueType": "dislike",
               "match": "not care for"
            }
         ],
         "label": "review1.doc",
         "text": "I do not care for the camera.  "
      }
   },
   {
      "label": "review10.doc",
      "sentiments": {
         "EmotiveTone.AllClues": [
            {
               "clueType": "positive",
               "match": "reliable"
            }
         ],
         "label": "review10.doc",
         "text": "It was very reliable "
      }
   },
...

You can now use Jaql to further aggregate the results, such as counting the positive and negative sentiments by product and directly uploading the results to a database for deeper analytical queries. For more details on how to create your own AQL files or use them within Jaql, see the developerWorks article "Integrate PureData System for Analytics and InfoSphere BigInsights for email analysis" and the InfoSphere BigInsights Knowledge Center.


Archiving the files

As mentioned, HDFS is not efficient at storing many small files. Every block stored in HDFS requires some small amount of memory in the HDFS NameNode (roughly 100B). Therefore, an excessive number of small files can increase the amount of memory consumed on the NameNode. Because you have already implemented a solution to read small binary files and convert them to larger files as the output, you can now get rid of the original small files. However, you might want to reanalyze your binary files later by using different methods. Use Hadoop Archive (HAR) to reduce the memory usage on the NameNodes by packing the chosen small files into bigger files. It's essentially equivalent to Linux® TAR format, or Windows™ CAB files, but on HDFS.

Run the archive command using the template below.

Listing 22. Archive command
hadoop archive -archiveName archive_name.har -p /path_to_input_files 
							/path_to_output_directory

The first argument specifies the output file name, and the second designates the source directory. This example includes only one source directory, but this tool can accept multiple directories.

After the archive has been created, you can browse the content files.

Listing 23. List HAR files
hadoop fs -lsr har:///path_to_output_directory/archive_name.har

Because you have the input files in HAR format, you can now delete the original small files to fulfill the purpose of this process.

It is good to note that HAR files can be used as input for MapReduce. However, processing many small files, even in a HAR, is still inefficient because there is no archive-aware InputFormat that can convert a HAR file containing multiple small files to a single MapReduce split. This limitation means that HAR files are good as a backup method and as a way to reduce memory consumption on the NameNode, but they are not ideal as input for analytic tasks. For this reason, you need to extract the text contents of the original files before creating the HAR backup.


Conclusion

This article describes one approach to analyzing a large set of small binary documents with Hadoop using Apache Tika. This method is definitely not the only way to implement such function. You can also create sequence files out of the binary files or use another storage method, such as Avro. However, the method described in this article offers a convenient way to analyze a vast amount of files in various types. Combining this method with Jaql technology, you have the ability to extract contents directly while reading files from various sources.

Apache Tika is one of the most useful examples, but you can replicate the same approach with essentially any other Java library. For example, you can extract binary documents not currently supported by Apache Tika, such as Outlook PST files.

You can implement everything described in this article by using only Java MapReduce. However, the Jaql module created in the second part of this article is a convenient way to load and transform data in Hadoop without the need for Java programming skills. The Jaql module enables you to do the conversion process during load and to use analytical capabilities, such as text or statistical analysis, which can be completed within a single job.


Download

DescriptionNameSize
Project and sample files for this articleSampleCode.zip26MB

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Information Management
ArticleID=978989
ArticleTitle=Processing and content analysis of various document types using MapReduce and InfoSphere BigInsights
publish-date=07292014