Process small, compressed files in Hadoop using CombineFileInputFormat

This article provides detailed examples that show you how to extend and implement CombineFileInputFormat to read the content of gzip (default codec) files at runtime. Learn how to use CombineFileInputFormat within the MapReduce framework to decouple the amount of data a Mapper consumes from the block size of the files in HDFS.

Sujay Som (sujaysom@in.ibm.com), Big Data Consultant and Application Architect, IBM India

Sujay Som is a certified big data consultant and application architect in the Global Business Service (GBS) Software Group (SWG) Center of C ompetence for big data analytics. He has more than 12 years of IT experience in architecture, design, and development.



11 February 2014

Also available in Russian Japanese

Introduction

The Apache Hadoop software library is capable of processing many types of data formats, from flat text files to databases. The MapReduce framework relies on the InputFormat of the job to validate the input specification and split up the input files into logical InputSplits — each of which is then assigned to an individual Mapper.

FileInputFormat, which is the base class for all file-based InputFormats, has the following direct subclasses:

  • TextInputFormat
  • SequenceFileInputFormat
  • NLineInputFormat
  • KeyValueTextInputFormat
  • CombineFileInputFormat

All of these InputFormats provide a generic implementation of getSplits(JobContext), which can also override the isSplitable (JobContext, Path) method to ensure that input files are not split up and are processed as a whole by the Mapper. It implements the createRecordReader inherited from the org.apache.hadoop.mapreduce.InputFormat class used to gather input records from the logical InputSplit for processing by the Mapper.

InfoSphere BigInsights Quick Start Edition

InfoSphere® BigInsights™ Quick Start Edition is a complimentary, downloadable version of InfoSphere BigInsights, IBM's Hadoop-based offering. Using Quick Start Edition, you can try out the features that IBM has built to extend the value of open source Hadoop, like Big SQL, text analytics, and BigSheets. Guided learning is available to make your experience as smooth as possible including step-by-step, self-paced tutorials and videos to help you start putting Hadoop to work for you. With no time or data limit, you can experiment on your own time with large amounts of data. Watch the videos, follow the tutorials (PDF), and download InfoSphere BigInsights Quick Start Edition now.

However, Hadoop performs better with a small number of large files, as opposed to a huge number of small files. ("Small" here means significantly smaller than a Hadoop Distributed File System (HDFS) block.) The situation is alleviated by designing CombineFileInputFormat, which works efficiently with small files, so that FileInputFormat creates a split per file. CombineFileInputFormat packs many files into each split, so each Mapper has more to process. CombineFileInputFormat can also provide benefits when processing large files. Basically, it decouples the amount of data that a Mapper consumes from the block size of the files in HDFS.

Currently, CombineFileInputFormat is an abstract class in the Hadoop class library (org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K,V>) without any concrete implementation. To harvest the benefit of CombineFileInputFormat, you need to create a concrete subclass of CombineFileInputFormat and implement the createRecordReader() method, which will instantiate a delegator CustomFileRecordReader class that extends RecordReader using its custom constructor. This approach also requires that a CustomWritable be constructed as the key for the Hadoop Mapper class. For each line, a CustomWritable will consist of the file name and the offset length of that line.

Check out a Hadoop example that shows how to use CombineFileInputForma to count the occurrences of words in the text files under the given input directory.

Here, you will learn how to extend and implement CombineFileInputFormat by adding the capability to read the content of gzip (default codec) files at runtime in decompressed form. We'll produce a key-value output where key is the combination of file name and the offset of that line, and value is the text representation of the line. The example in this article uses CustomInputFormat within the MapReduce framework. You'll develop the following key classes:

CompressedCombineFileInputFormat
Extends CombineFileInputFormat and implements createRecordReader to pass in the record reader that does the combine file logic
CompressedCombineFileRecordReader
Extends RecordReader and is a delegate class of CombineFileRecordReader
CompressedCombineFileWritable
Implements WritableComparable, stores the file name and offset, and overrides the compareTo method to compare the file name first and compare the offset

The CompressedCombineFileInputFormat example uses the MapReduce program, which uses the publicly available National Oceanic and Atmospheric Administration (NOAA) historical weather data. NOAA compiles extremes statistics for U.S. stations using monthly summaries for three weather elements: temperature, precipitation, and snowfall. The example used here calculates the maximum temperature from the weather data (National Climatic Data Center), which is available in a compressed (gzip) file. See Resources for details about getting weather data.


Multiple compressed file input format

Our solution uses the compressed CombineFileInputFormat, which uses three concrete classes:

  • A subclass of the abstract implementation of CombineFileInputFormat (org.apache.hadoop.mapreduce.lib.input. CombineFileInputFormat<K,V>)
  • The concrete subclass of RecordReader (org.apache.hadoop.mapreduce.RecordReader<K,V>)
  • A custom Writable class that implements WritableComparable (org.apache.hadoop.io.WritableComparable) and generates the key of the file lines, which are composed of the file name and the offset of the lines

CompressedCombineFileInputFormat

CompressedCombineFileInputFormat.java is the subclass of CombineFileInputFormat. It implements InputFormat.createRecordReader(InputSplit, TaskAttemptContext) to construct RecordReaders for CombineFileSplits. CombineFileSplit is a sub-collection of input files. Unlike FileSplit, the CombineFileSplit class does not represent a split of a file but a split of input files into smaller sets. A split may contain blocks from different files, but all the blocks in the same split are probably local to the same rack. You use CombineFileSplit to implement RecordReader by reading one record per file. It is not required to split files in half, so the isSplitable() method is overridden to return false (it otherwise defaults to true). Listing 1 shows an example.

Listing 1. CompressedCombineFileInputFormat.java
package com.ssom.combinefile.lib.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;


public class CompressedCombineFileInputFormat 
extends CombineFileInputFormat<CompressedCombineFileWritable, Text>  {
	
	public CompressedCombineFileInputFormat(){
		super();
	
	}

	public RecordReader<CompressedCombineFileWritable,Text> 
createRecordReader(InputSplit split,
	    TaskAttemptContext context) throws IOException {
	  return new 
CombineFileRecordReader<CompressedCombineFileWritable, 
Text>((CombineFileSplit)split, context, 
CompressedCombineFileRecordReader.class);
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path file){
	  return false;
	}

}

CompressedCombineFileRecordReader

CompressedCombineFileRecordReader.java is a delegate class of CombineFileRecordReader, which is a generic RecordReader that can hand out different RecordReaders for each chunk in a CombineFileSplit. CombineFileSplit can combine data chunks from multiple files. This class lets you use different RecordReaders for processing data chunks from different files.

When the Hadoop job is invoked, CombineFileRecordReader reads all the file sizes in the HDFS input path that need to be processed and decides how many splits are required, based on the MaxSplitSize. For each split (which must be a file because isSplitabe() is overridden and set to return false), CombineFileRecordReader creates a CompressedCombineFileRecordReader instance and passes in CombineFileSplit, context, and index for CompressedCombineFileRecordReader to locate the files for processing.

After it's instantiated, CompressedCombineFileRecordReader determines whether the input file contains a compression codec (org.apache.hadoop.io.compress.GzipCodec). If yes, the file is decompressed at runtime and used for processing. Otherwise, it is considered a text file. While processing the file, CompressedCombineFileRecordReader creates CompressedCombineFileWritable as the key for the invoked Mapper class. For each line read, CompressedCombineFileWritable comprises the file name and the offset length of that line, as shown in the MapReduce example.

Listing 2 shows an example of CompressedCombineFileRecordReader.java.

Listing 2. CompressedCombineFileRecordReader.java
package com.ssom.combinefile.lib.input;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;

/**
 * RecordReader is responsible from extracting records from a chunk
 * of the CombineFileSplit. 
 */
public class CompressedCombineFileRecordReader 
  	extends RecordReader<CompressedCombineFileWritable, Text> {

	private long startOffset;
	private long end; 
	private long pos; 
	private FileSystem fs;
	private Path path;
	private Path dPath;
	private CompressedCombineFileWritable key = new CompressedCombineFileWritable();
	private Text value;
	private long rlength;
	private FSDataInputStream fileIn;
	private LineReader reader;
	  
 
	public CompressedCombineFileRecordReader(CombineFileSplit split,
	      TaskAttemptContext context, Integer index) throws IOException {
	    
			Configuration currentConf = context.getConfiguration();
		  	this.path = split.getPath(index);
		  	boolean isCompressed =  findCodec(currentConf ,path);
		  	if(isCompressed)
		  		codecWiseDecompress(context.getConfiguration());
	
		  	fs = this.path.getFileSystem(currentConf);
		  	
		  	this.startOffset = split.getOffset(index);
	
		  	if(isCompressed){
		  		this.end = startOffset + rlength;
		  	}else{
		  		this.end = startOffset + split.getLength(index);
		  		dPath =path;
		  	}
		  	
		  	boolean skipFirstLine = false;
	    
	        fileIn = fs.open(dPath);
	        
	        if(isCompressed)  fs.deleteOnExit(dPath);
	        
	        if (startOffset != 0) {
	        	skipFirstLine = true;
	        	--startOffset;
	        	fileIn.seek(startOffset);
	        }
	        reader = new LineReader(fileIn);
	        if (skipFirstLine) {  
	        	startOffset += reader.readLine(new Text(), 0,
	        	(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
	        }
	        this.pos = startOffset;
	  }

	  public void initialize(InputSplit split, TaskAttemptContext context)
	      throws IOException, InterruptedException {
	  }

	  public void close() throws IOException { }

	  public float getProgress() throws IOException {
		    if (startOffset == end) {
		      return 0.0f;
		    } else {
		      return Math.min(1.0f, (pos - startOffset) / (float)
                  (end - startOffset));
		    }
	  }

	  public boolean nextKeyValue() throws IOException {
		    if (key.fileName== null) {
		      key = new CompressedCombineFileWritable();
		      key.fileName = dPath.getName();
		    }
		    key.offset = pos;
		    if (value == null) {
		      value = new Text();
		    }
		    int newSize = 0;
		    if (pos < end) {
		      newSize = reader.readLine(value);
		      pos += newSize;
		    }
		    if (newSize == 0) {
		      key = null;
		      value = null;
		      return false;
		    } else {
		      return true;
		    }
	  }

	  public CompressedCombineFileWritable getCurrentKey() 
	      throws IOException, InterruptedException {
		  return key;
	  }

	  public Text getCurrentValue() throws IOException, InterruptedException {
		  return value;
	  }
  
 
	private void codecWiseDecompress(Configuration conf) throws IOException{
		  
		 CompressionCodecFactory factory = new CompressionCodecFactory(conf);
		 CompressionCodec codec = factory.getCodec(path);
		    
		    if (codec == null) {
		    	System.err.println("No Codec Found For " + path);
		    	System.exit(1);
		    }
		    
		    String outputUri = 
CompressionCodecFactory.removeSuffix(path.toString(), 
codec.getDefaultExtension());
		    dPath = new Path(outputUri);
		    
		    InputStream in = null;
		    OutputStream out = null;
		    fs = this.path.getFileSystem(conf);
		    
		    try {
		    	in = codec.createInputStream(fs.open(path));
		    	out = fs.create(dPath);
		    	IOUtils.copyBytes(in, out, conf);
		    	} finally {
		    		IOUtils.closeStream(in);
		    		IOUtils.closeStream(out);
					rlength = fs.getFileStatus(dPath).getLen();
		    	}
	  }
	
	private boolean findCodec(Configuration conf, Path p){
		
		CompressionCodecFactory factory = new CompressionCodecFactory(conf);
	    CompressionCodec codec = factory.getCodec(path);
	    
	    if (codec == null) 
	    	return false; 
	    else 
	    	return true;

	}
  
}

CompressedCombineFileWritable

The CompressedCombineFileWritable.java class implements WritableComparable and extends org.apache.hadoop.io.Writable, java.lang.Comparable, as shown in Listing 3.

Writable is a serializable object that implements a simple and efficient serialization protocol based on DataInput and DataOutput. Any key or value type in the MapReduce framework implements this interface. Implementations typically use a static read(DataInput) method that constructs a new instance, calls readFields(DataInput), and returns the instance.

Comparable is an interface that's a member of the Java™ collections framework (JCF). It imposes a total ordering on the objects of each class that implements it. This ordering is called the class's natural ordering. The class's compareTo method is called its natural comparison method. Lists (and arrays) of objects that implement this interface can be sorted automatically by Collections.sort (and Arrays.sort). Objects that implement this interface can be used as keys in a sorted map, or as elements in a sorted set, without the need to specify a comparator.

Because of such properties, you can compare CompressedCombineFileWritable using comparators. The hashCode() command is frequently used in Hadoop to partition keys. It's important that the implementation of hashCode() returns the same result across different instances of the JVM. The default hashCode()) implementation in Object does not satisfy this property. Thus, the hashCode()), equals(), and toString() methods are overridden for consistency and efficiency.

Listing 3. CompressedCombineFileWritable.java
package com.ssom.combinefile.lib.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;


/**
 * This record keeps filename,offset pairs.
 */

@SuppressWarnings("rawtypes")

public class CompressedCombineFileWritable implements WritableComparable {

    public long offset;
    public String fileName;
    
    
 
    public CompressedCombineFileWritable() {
		super();
	}

	public CompressedCombineFileWritable(long offset, String fileName) {
		super();
		this.offset = offset;
		this.fileName = fileName;
	}

	public void readFields(DataInput in) throws IOException {
      this.offset = in.readLong();
      this.fileName = Text.readString(in);
    }

    public void write(DataOutput out) throws IOException {
      out.writeLong(offset);
      Text.writeString(out, fileName);
    }

    
    public int compareTo(Object o) {
      CompressedCombineFileWritable that = (CompressedCombineFileWritable)o;

      int f = this.fileName.compareTo(that.fileName);
      if(f == 0) {
        return (int)Math.signum((double)(this.offset - that.offset));
      }
      return f;
    }
    @Override
    public boolean equals(Object obj) {
      if(obj instanceof CompressedCombineFileWritable)
        return this.compareTo(obj) == 0;
      return false;
    }
    @Override
    public int hashCode() {
  
    	final int hashPrime = 47;
        int hash = 13;
        hash =   hashPrime* hash + (this.fileName != null ? this.fileName.hashCode() : 0);
        hash =  hashPrime* hash + (int) (this.offset ^ (this.offset >>> 16));
    	
        return hash; 
    }
    @Override
    public String toString(){
    	return this.fileName+"-"+this.offset;
    }

  }

MapReduce example

The example in this section shows how to use CompressedCombineFileInputFormat with a sample MapReduce program. The MapReduce program uses the NOAA's historical weather data with compiled extremes statistics for U.S. stations using monthly summaries for temperature, precipitation, and snowfall. The example calculates the maximum temperatures from the weather data, which are in a compressed (gzip) file.

The example walks you through the facets of CompressedCombineFileInputFormat usage. InfoSphere BigInsights Quick Start Edition was used to run this example.

Assumptions

The example assumes:

  • The Hadoop environment is installed and running.
  • Data required for the example is downloaded from NOAA's National Climatic Data Center.
  • The downloaded data is ingested in HDFS.
  • The operational statistics provided are executed in InfoSphere BigInsights Quick Start Edition (for a non-production environment).

Running the example

CompressedCombineFileInputFormat's classes are packed into a JAR file (CompressedCombine-FileInput.jar) so you can refer to it in other projects. We'll use separate MapReduce programs to illustrate the performance difference when using default input format (org.apache.hadoop.io.Text) and custom input format (com.ssom.combinefile.lib.input. CompressedCombineFileInputFormat).

Input data

NCDC data is publicly available. Table 1 shows the format of the NCDC data in our examples.

Table 1. NCDC data
ReadingDescription
0057# MASTER STATION catalog identifier
332130# USAF weather station identifier
99999# WBAN weather station identifier
99999# WBAN weather station identifier
19470101# observation date
0300# observation time
4# data source flag
+51317# latitude (degrees x 1000)
+028783# longitude (degrees x 1000)
FM-12# report type code
+0171# elevation (meters)
99999# call letter identifier
V020# quality control process name
320# wind direction (degrees)
1# quality code
N# type code
0072# speed rate
1# speed quality code
00450# sky ceiling height (meters)
1# quality code
CN# ceiling height dimension
010000# visibility distance (meters)
1# quality code
N9# variability code
-0128# air temperature (degrees Celsius x 10)
1# quality code
-0139# dew point temperature (degrees Celsius x 10)
1# quality code
10268# atmospheric pressure (hectopascals x 10)
1# quality code

The files of data are organized by date and weather station. There is a directory for each year starting in 1901. Each directory contains a compressed file for each weather station with its readings for that year. Figure 1 shows the first entries for 1947.

Figure 1. Sample list of files
Image shows sample list of files

I selected data for 1901, 1902, and 1947, which entails more than 1,000 compressed files. The total size of the files is approximately 56 MB.

Using the default input format

The Mapper class is a generic type with four formal type parameters that specify the input key, input value, output key, and output value types of the map function, as shown below.

public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {

For the default input format, the:

  • Input key is a long integer offset
  • Input value is a line of text
  • Output key is a year
  • Output value is an air temperature (an integer)

The map() method is passed a key and a value that converts the text value containing the line of input into a Java string. It then uses the substring() method to extract the values. In this case, Mapper writes the year as a Text object (since we're just using it as a key), and the temperature is wrapped in an IntWritable. An output record is only written if the temperature is present and the quality code indicates the temperature reading is OK.

The Reducer class is also a generic type. It has four formal type parameters used to specify input and output types. The output types of the reduce function are Text, IntWritable and year, and its maximum temperature, which you find by iterating through the temperatures and comparing each with a record of the highest found so far. The Reducer class definition is:

public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {

The output will look like the information below if there are multiple years.

  • 1901 — 278
  • 1947 — 283

Figures 2 and 3 show the results of running the program using the default (Text) input format in the InfoSphere BigInsights Administration Console.

Figure 2. Application status — default execution
Image shows application status –- default execution

Click to see larger image

Figure 2. Application status — default execution

Image shows application status –- default execution
Figure 3. Completion
Image shows completion

Click to see larger image

Figure 3. Completion

Image shows completion

The InfoSphere BigInsights Administration Console provides the key statistics shown in Table 2.

Table 2. Resulting statistics
Key AttributeResult
Name Find_Max_Temperature_Default_Input_Format
Job ID job_201401020703_0001
Map % Complete 100%
Reducer % Complete 100%
Start Time 2014-01-02 07:09
End Time 2014-01-02 07:43
User Name biadmin
Priority NORMAL
Total time to execute and get results is 34 minutes.

Figure 4 shows the output of the MapReduce program.

Figure 4. Response file content — default execution
Image shows response file content –- default execution

Using the custom input format

Again you'll use the Mapper class, which is a generic type, with CompressedCombineFileWritable as the input key, as shown below.

public class MaxTMapper extends
	Mapper<CompressedCombineFileWritable, Text, Text, IntWritable> {

The processing logic is the same as in the previous case (see Using the default input format).

The Reducer class, which remains the same with no changes, has the definition:

public class MaxTemperatureReducer extends
		Reducer<Text, IntWritable, Text, IntWritable>  {

To set the job's input format, we use:

job.setInputFormatClass(CompressedCombineFileInputFormat.class);

The sample key and value are:

Key: 227070-99999-1901-116370

Line:
0029227070999991901101520004+62167+030650FM
-12+010299999V0201401N003119999999N0000001N9+00331+99999102311
ADDGF100991999999999999999999

Figures 5 and 6 show the result of running the custom (CompressedCombineFileInputFormat) input format from the InfoSphere BigInsights Administration Console.

Figure 5. Application status — custom execution
Image shows application Status –- custom execution

Click to see larger image

Figure 5. Application status — custom execution

Image shows application Status –- custom execution
Figure 6. Completion
Image shows completion

The InfoSphere BigInsights Administration Console provides the key statistics shown in Table 3.

Table 3. Output from multiple years
Key Attribute Result Obtained
Name Find_Max_Temperature_Custom_Input_Format
Job ID job_201401020703_0002
Map % Complete 100%
Reducer % Complete 100%
Start Time 2014-01-02 08:32
End Time 2014-01-02 08:37
User Name biadmin
Priority NORMAL
Total time to execute and get results is 5 minutes.

Figure 7 shows the output of the MapReduce program.

Figure 7. Response file content — custom execution
Image shows response file content –- custom execution

Conclusion

When you can, it's a good idea to avoid many small files because MapReduce works best when it can operate at the transfer rate of the disks in the cluster. Processing many small files increases the number of seeks that are needed to run a job. If, for business or strategic reasons, you have a large number of small files and HDFS is available, CombineFileInputFormat can be useful. CombineFileInputFormat is not only good for small files but it can also bring performance benefits when processing large files.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics
ArticleID=961759
ArticleTitle=Process small, compressed files in Hadoop using CombineFileInputFormat
publish-date=02112014