Analyzing large datasets with Hive


The big data industry has mastered the art of gathering and logging terabytes of data, but the challenge is to base forecasts and make decisions derived from this real data, which is why Apache Hive is so important. It projects a structure onto the data and queries this data following a SQL-like query structure to perform Map and Reduce tasks on large datasets.

InfoSphere BigInsights is a similar proprietary system from IBM. It is based upon the open source Apache Hadoop project, but it consists of added functionality for enterprise deployment. It makes it easier to use Hadoop and build applications driven by large datasets. Just like with Hive, it provides a SQL interface for Hadoop, so the user can access data in BigInsights without having to learn a new programming language. It also provides high availability for the BigInsights NameNode (also known as the MasterNode), for seamless and transparent fail-over technology, thus reducing any system downtime.

We will present the analysis of Call Data Records (CDRs) with Hive, as an example of implementing big data analytics. CDR is a term used in telecommunications, referring to any event that may be used to charge a subscriber. Events such as call initiation, termination, duration, and Internet data transfer over smartphones are examples of services that get logged in data stores and are used for billing subscribers.

Before we start

We recommend you have basic knowledge of the following technologies and concepts. Please refer to the Related topics section for tutorials that cover the following:

  • How to write basic SQL DDL and DML scripts such as select, create, insert.
  • How to write and compile basic Java™ programs.
  • How to package Java bytecode into a Java Archive (JAR) file.

Using CDRs as an example

A Call Detail Record (CDR), also known as a Call Data Record, is a data record produced by a telephone exchange or other telecom equipment documenting the details of a phone call that passed through the facility or device. A CDR consists of data fields that describe a telecom transaction. These data fields can consist of the following:

  • Subscriber phone number
  • Recipient phone number
  • Start timestamp
  • Call duration
  • Billing phone number
  • Telephone exchange equipment ID
  • Record ID
  • Disposition or the results of the call (whether the status of call was busy or failed)
  • The route by which the call entered the exchange
  • The route by which the call left the exchange
  • Call type (voice, SMS, etc.)

For the purposes of this article, along with CDRs, we will also use another dataset known as network logs. A network log is a record of network-centric activity when a user performs a call, accesses the web or email, or simply shifts to another cellular tower. Some of the relevant data fields for a network log are shown here:

  • Timestamp of the event
  • IMSI (a unique ID associated with a cellular network)
  • IMEI (a unique ID identifying a mobile phone)
  • Call type (code for voice call, SMS, etc.)
  • Cell type (code for type of the cellular tower recording this information)
  • Cell ID (ID of the cellular tower recording this information)
  • Subscriber phone number
  • Latitude (geographical coordinate of the tower)
  • Longitude (geographical coordinate of the tower)

Telecom providers are interested in estimating various trends in order to plan future upgrades and deployments driven by real data. As an example, a typical provider would want to know which equipment, such as a cell tower, originates the bulk of the calls. Another valuable data point is determining which base stations serve as the busiest switching hubs during varying time slots of the day, especially the diurnal traffic patterns, and what sorts of calls are frequently made to various locations. We will correlate these two big data sources, which are typically in the order of hundreds of gigabytes on a daily basis for a moderate-size cellular network.

It is pertinent to mention that these call logs contain data for every cellular transaction being made on any cellular network. In recent news, Verizon gave unprecedented access to the National Security Agency (NSA) to scoop up call logs directly from Verizon's servers. This translated into millions of entries of call logs specifying the senders' and receivers' numbers, the duration of calls, and the location of the end-point towers.

Using Hive for big data analytics

Let's start with correlating the two datasets defined above. For this purpose, Hive provides SQL-like join semantics. An inner join is the most common join operation used in applications and can be regarded as the default join-type. The inner join combines column values of two tables, say A (CDR) and B (network logs), based upon the join predicate. An inner join query compares each row of A with each row of B to find all those pairs of rows that satisfy the join predicate. If the join predicate is satisfied, column values from A and B for that record are combined to form a new resultant record. An inner join can be thought of as taking a Cartesian product of the two tables and then returning those records, which satisfy the join predicate. A simplistic representation of a join query is shown next.

Inner join predicate

The join predicate is a condition, defined by 'on' and written in the query in Listing 1.

Inner join predicate
hive> insert overwrite table correlation            \
      partition(dt=20130101, hour=12)                  \
      select cdr.timestamp,      cdr.subscriberPhone,  \
      cdr.recipientPhone,        cdr.duration,         \
      cdr.billingPhone,          cdr.exchangeID,       \
      cdr.recordID,              cdr.callResult,       \ 
      cdr.entryRoute,            cdr.exitRoute,        \
      cdr.callType,                                    \
      net.dtstamp,               net.minute            \
     net.second,                 net.IMSI,             \
     net.IMEI,                   net.callType,         \
     net.cellType,               net.cellID,           \
     net.subscriberPhone,        net.cellLat,          \    
     net.cellLon                                       \
      from cdr join net                                \
on (cdr.subscriberPhone = net.subscriberPhone)         \
      where combine(net.dtstamp,                       \
            net.hour,                                  \ 
            net.minute,                                \
            net.second) <= cdr.timestamp            \
      and cdr.timestamp like '2013010112%              \
      and net.dtstamp like '20130101'                  \
      and net.hour = 12;

Let's walk through the query in detail. The insert overwrite part of the query statement simply tells the Hive interpreter to insert the selected fields into a table following the table keyword, which in this case is correlation. The join keyword and where clause strictly adheres to standard SQL syntax.

The correlation table can be created using the create table standard SQL command. In the query above, there is a partition keyword that defines which partition of the correlation table the data should be inserted into. A table can have one or more partition columns, and a separate data directory is created for each distinct value combination in the partition columns. Listing 2 is an example of how to create a table with partitions. We use the correlation table as an example.

Table correlation
hive> create table correlation(
timestamp string,     subscriberPhone string,         \
      recipientPhone string,     duration int,        \
      billingPhone string,     exchangeID string,     \
      recordID string,         callResult string,     \
      entryRoute string,     exitRoute string,        \
    callType string,         dtstamp string,          \
minute int,            second int,                    \
IMSI string,        IMEI string,                      \
callType string,        cellType string,              \
cellID string,        subscriberPhone string,         \
cellLat string,        cellLon string)                \
partitioned by (dt string, hour int)                  \
location '/mnt/user/hive/warehouse/correlation'

We also use a custom Hive user-defined function (UDF) (see Related topics for more information). A UDF is a custom function written by the user that can be loaded into the Hive command-line interface (CLI) and used repeatedly. It is a generic term as well as a Java class. Our UDF is defined as combine(), which simply combines the date, hour, minute, and second into a required format such as: yyyyMMddHHMMSS. The code for the DateCombiner UDF Java class is shown below. Note that we extended the UDF Java class. You could also use a GenericUDF Java class, which is a newer implementation of the UDF semantics. It provides better performance because of the use of lazy evaluation and short-circuiting. It also supports non-primitive parameters and a variable number of arguments.

The UDF Java class is easier to use than the GenericUDF Java class and offers acceptable performance. It uses the Reflection API, so it is slightly slower than the GenericUDF Java implementation. It does not accept or return non-primitive parameters like array, struct, or maps, and it does not support a variable number of arguments. You can use the GenericUDF Java class if there is a need to handle complex data types, but for most instances, the UDF Java class can be used.

The first thing to do when writing a UDF is to define a Java package. Next, the UDF Java class is imported into the source code to let the Java compiler know which Java parent class is responsible for the methods used by the inheriting class. The import declaration is only a compile time element of the source code and has no presence during runtime. This is because the JVM bytecode always uses fully qualified class names.

Next, a class is declared that extends the parent UDF Java class. An evaluate() method is defined and contains the logic of the program. In short, a UDF must satisfy the following two properties:

  1. A UDF must be a subclass of the UDF Java class or GenericUDF Java class.
  2. A UDF must implement at least one evaluate() method (see Listing 3).
Defining an evaluate() method
package HiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class DateCombiner extends UDF (
public String evaluate(final String dt, final String hh, final String mm,
        final String ss) {
      it(dt==null) { return null; }
      String ho="", mi="", se="",
      if(hh.length() != 2) { ho = "0".concat(hh); } else { ho = hh; }
     if(mm.length() != 2) { mi = "0".concat(mm); } else { mi = mm; }
     if(ss.length() != 2) { se = "0".concat(ss); } else { se = ss; }
     return new String(dt + ho + mi + se);

There are a couple of important steps to perform before a UDF can be used and the above join query can be executed. First, the UDF has to be compiled and packaged into a JAR file. Compilation requires that the Hive executable JAR file be added or provided in Java classpath (see Listing 4).

Adding the Hive executable JAR file
$javac -cp /apps/hive/lib/hive-exec-0.7.1-cdh3u3.jar
$ jar cf HiveUDF.jar HiveUDF/

Next, the UDF JAR file has to be loaded into the Hive CLI. This should be followed by creating a temporary function, called combine, which corresponds to the DateCombiner UDF Java class (see Listing 5).

Listing 5. The combine function
hive> add jar /home/verrami1/work/JARS/HiveUDF.jar;
hive> create temporary function combine as 'HiveUDF.DateCombiner';

We have now covered how to use the UDF Java class to apply an inner join on a sizeable dataset, which in our case is two columns of a CDR database. Next, we will move on to list some of the general Hive best practices found within big data analytics

Tuning Hive tasks for speed and efficiency

In this section, we describe some of the best practices for operating and executing queries on Hive. Although these practices aren't specific to our telecom use case, they are applicable given the huge amounts of data processed by telecom companies.

Data partitioning

We need to ensure that the proper criteria is met for creating partitions while defining a table, which increases the speed with which our Hive tasks are processed. Carefully creating partitions can also save a lot of troubleshooting. The best way to partition tables is to use one or more metrics, depending upon the expected size of the table. In our example of analyzing CDR and network logs, we expect a gigabyte of CDR data to be logged into the system. Now multiply this by 24 hours and imagine running a select query over data gathered in a single day (approximately 24 GB). This call data will not stop pouring into the database, and if proper care is not taken in data partitioning, you may end up requiring a huge cluster to perform even the most menial of queries over your database. Listing 6 demonstrates one smart approach to partitioning data.

Listing 6. Smart approach to partitioning data
hive> create table correlation(                         \
     timestamp string,           subscriberPhone string,   \
     recipientPhone string,      duration int,             \
     billingPhone string,        exchangeID string,        \
     recordID string,            callResult string,        \
     entryRoute string,          exitRoute string,         \
     callType string,                                      \
     dtstamp string,              minute int,              \
     second int,                  IMSI string,             \
     IMEI string,                 callType int,            \
     cellType int,                cellID string,           \
     subscriberPhone string, cellLat string,               \
     cellLon string)                                       \
     partitioned by(dt string, hour int)                   \
     location '/mnt/data/correlation';

Every system has a limit on physical resources, such as cores and memory. If you're not careful in partitioning the database, you will end up with a data analysis batch job running suddenly crashing due to memory overflow or exhaustion. But what you would see in return would be a plethora of errors without any clue that memory overflow is the culprit.

Describe a table in detail

Hive has a very nice feature that allows you to see details about a table, such as columns, data types, storage location of the table, size, etc. To view such information, use describe formatted with the table name as shown in Listing 7. Since the output is huge, we've shown a sample of the output fields

describe formatted
hive> describe formatted correlation;
# col_name        data_type       comment
timestamp         string          None
subscriberPhone   string          None
recipientPhone    string          None
duration          string          None
# Partition Information
# col_name        data_type       comment
dt                string           None
hour              int              None
CreateTime: Mon, Jun 17 2013 PST
Location: /mnt/data/correlation

Joins (bigger table on right side)

As mentioned, a join first computes a Cartesian product of the two tables and then returns those rows (with combined columns) that satisfy the join predicate. This whole process makes a join a computationally intensive and memory-heavy process. In order to efficiently use a join, you must keep the bigger table (in terms of rows) on the right-hand side of the join operation. For example, if table A has 1 million rows and table B has 200,000 rows, the joining part of the query should look something like hive>... B join A ..;.

The join is a complex operator from relational algebra and has been defined at many places by mathematicians. However, if it is the opposite (A join B, for example), the Hadoop map and reduce tasks would go over 1 million rows of table A to find matching rows from table B. Instead, B join A would traverse table B's 200,000 rows to try and find a match for table A. This would prove to be much more efficient as it has to make fewer iterations.

where clause (with ranges)

Another important thing to do is to bind the queries with limitations using where clauses. Note that we used four binding conditions in our join query example. A where clause restricts a query from employing brute-force technique to return the answer. Think of this as a dynamic partitioning of the table from where the data has to be retrieved.

Using standard functions such as collect_set()

Apart from many useful standard aggregate functions provided by Hive, such as count(), sum(), min(), max(), etc., it also provides a collection function called collect_set(). It returns a set of objects with duplicate elements eliminated. For example, to see distinct users who registered to a cellular tower during midday on 01 Jan 2013, the query in Listing 8 can be used.

Listing 8. collect_set() function
hive> select cellID, collect_set(subscriberPhone) from correlation 
where dt=?20130101? and hour=12 group by cellID;


Hive provides cast() function for typecasting string to integer, string to double and vice-versa. Typecasting is the best way to make sure the comparison is exactly as intended. Listing 9 shows generally how cast() is used. Expression and type could be integer, bigint, float, double or string. In the examples shown, a string is converted to an integer, and a double is converted to a string.

Listing 9. cast() function
cast(expression as <type>)
hive> cast('1' as int)
hive> cast(75.89 as string)


In this article, we have shown how to use Hive for analyzing large datasets using Hadoop as a back end. You learned what the schema of a large production dataset might look like. Using a single example, we explained how to join two large datasets to form a correlation dataset. To keep huge amounts of correlation data, we learned how to create a correlation table with partitions. We then walked through the best practices for using Hive efficiently and effectively.

Downloadable resources

Related topics


Sign in or register to add and subscribe to comments.

Zone=Big data and analytics, Information Management
ArticleTitle=Analyzing large datasets with Hive