Contents


Integrate Hadoop with an existing RDBMS

A design approach

Comments

The Apache™ Hadoop® environment includes components such as MapReduce, HDFS, Pig, Hive, Oozie, Sqoop, and Flume, to name a few. These Hadoop tools and technologies were designed to handle big data. After you import your data into Hadoop, you can use tools such as Jaql and R to analyze and use that data.

To define a high-level system design for how to integrate Hadoop, specifically a Hadoop Distributed File System (HDFS), with a legacy relational database management system (RDBMS), I use a loan application system as a case study. In this loan application process, a loan applicant approaches a loan officer in the bank to apply for a loan. The loan applicant supplies the bank details, and the loan officer uses them to check the activity on the bank account in a banking application system. Based on this manual assessment, the loan officer decides to either approve or reject the loan application. If approved, the applicant submits the loan application to the bank. The loan application goes through a series of approvals until it is finally approved and the loan is disbursed to the applicant.

Typically, predictive and prescriptive analyses on big data are process-intensive operations. These process-intensive operations should be run on the data in the HDFS in the Hadoop cluster (and not on the data in an RDBMS). Because of the storage and parallelism capabilities of HDFS, and because a Hadoop cluster can run optimally on many different readily available hardware systems, this integrated design:

  • Scales well
  • Provides cheaper storage options
  • Improves the performance of our overall big data environment

Because Hadoop can store more data, and store more types of data (both structured and unstructured), the data analyses are more intelligent and ultimately help us make better business decisions.

Setting up a Hadoop cluster

Use the information that is available with the Hadoop solution you are using to set up your Hadoop cluster. Some of the well known standard solutions include: IBM Open Platform with IBM BigInsights for Apache Hadoop from IBM, CDH from Cloudera, and Hortonworks Data Platform (HDP) from Hortonworks. Also, use the books listed in the resources.

Review the "Big data architecture and patterns" series of IBM developerWorks articles, which presents a structured and pattern-based approach to simplify the task of defining an overall big data architecture. Refer to blogs and communities about big data for more ideas and support.

Based on my experiences and research, these hints and tips can help you set up a secure, scalable Hadoop cluster:

  • Always create a dedicated Hadoop user account before you set up the Hadoop architecture. This account separates the Hadoop installation from other services that are running on the same machine.
  • Avoid using RAID (Redundant Array of Independent Disks) in your configuration for the following reasons:
    • The redundancy that is provided by RAID is not needed because HDFS provides redundancy by default with its replication between nodes.
    • Failure of a disk in a RAID configuration affects the whole disk array, which causes the node to become unavailable. However, in HDFS the node will continue to operate without the failed disk.
    • In RAID systems, the speed of read and write operations is limited by the speed of the slowest disk in the array. But, HDFS uses a JBOD (Just a Bunch of Disks) configuration, where the disk operations are independent, so the average speed is greater than the slowest disk.
  • Calculate the amount of data storage you need by multiplying the number of nodes you have by the expected size of the data that you will collect over a given time.
  • To get the maximum performance in a multi-rack Hadoop cluster, map your nodes to racks by using the topology.node.switch.mapping.impl configuration property.
  • Ensure that there is plenty of memory on the namenode. The namenode usually has high memory requirements because it holds file and block metadata for the entire namespace in memory. The secondary namenode just keeps a copy or mirror of the namenode.
  • While you can put both the namenode and the secondary namenode on the same node, some scenarios require that you put them on separate nodes. For example, you place them on separate nodes to aid in recovery in case of loss (or corruption) of all the metadata files for the namenode.
  • Put the jobtracker on a dedicated node because the MapReduce jobs that the jobtracker runs can grow.
  • When the namenode and jobtracker are on separate nodes, synchronize their slave files, because each node in the cluster needs to run a datanode and a tasktracker.
  • Review the audit logs for namenodes and jobtrackers and resource managers. HDFS has its own audit log.
  • To control the memory imprint and CPU usage during HDFS operations, set the following properties:
    • mapred.tasktracker.map.tasks.maximum. This property controls the number of Map tasks that run on a tasktracker at one time.
    • mapred.tasktracker.reducer.tasks.maximum. This property controls the number of Reduce tasks that run on a tasktracker at one time.
    • mapred.child.java.opts. This property defines how much memory is allocated to each child JVM that is generated by these Map or Reduce tasks.
  • For files that will not be frequently updated, use the DistributedCache facility to cache these files in your MapReduce programs.
  • You can also adjust the memory requirements for the namenode and secondary namenode by using the HADOOP_NAMENODE_OPTS and HADOOP_SECONDARYNAMENODE_OPTS parameters in the hadoop-env.sh script.
  • Configure a security architecture in your Hadoop cluster, including these tools or technologies:
    • By default, Hadoop uses the whoami tool (which is available in most UNIX operating systems) for authorization and authentication. Unfortunately, this tool can easily be impersonated by any user who knows the credentials for the operating system. However, to improve the security of your Hadoop cluster, configure your Hadoop cluster to use the Kerberos authentication mechanism. Also, you need to supplement the base Kerberos authentication mechanism with security tokens.
    • Make sure that Kerberos is implemented in the Hadoop cluster. Kerberos initiates and establishes the handshake between the client (our J2EE application) and the Hadoop cluster. Subsequent calls to services, file accesses, and jobs are then handled securely with security tokens (delegation tokens, block access tokens, and job tokens). This approach also minimizes the load that the tasks that were created would have had on the Kerberos Key Distribution Center, if it were to generate each token per task.
    • Set up the Oozie workflow engine. Oozie workflow jobs require trust tokens to be set up so that the jobs can run on behalf of the user initiating the workflow.
    • Configure Spengo, which is included with Hadoop, for authentication for web interfaces like Hue. For confidentiality in Hadoop, data is pseudo-encrypted by using built-in compression formats.

    For more information about implementing security in your Hadoop cluster, read the "Adding Security to Apache Hadoop" technical report.

  • Benchmark your Hadoop Cluster after setting it up.

Moving data into your Hadoop Distributed File System (HDFS)

Now that the Hadoop cluster is set up, it is time to import the data. In the loan application use case, I need to integrate the Hadoop cluster with the existing banking application, which uses an RDBMS as its data storage. Next, data must be moved between the new HDFS and the existing RDBMS. HDFS can either replace data warehouses or act as a bridge between structured and unstructured data and those existing data warehouses.

Based on my experiences and research, these hints and tips can help you set up your HDFS:

  • An operation in HDFS should produce the same result no matter how many times an operation is run.
  • Before you move data into HDFS, aggregate it so that MapReduce programs use minimal amounts of memory for translation, especially on the namenode.
  • Before you move data into HDFS, convert it from one format to another such that it is suitable for the target system.
  • Implement failover, such that an operation is tried again if it fails, such as when a node is unavailable.
  • Verify that data was not corrupted when it was transferred across the network.
  • Control the amount of parallelism that is used and limit the number of MapReduce programs that are run. Both of these capabilities impact resource consumption and performance.
  • Monitor your operations to ensure that they are successful and produce the expected result.

While you can use MapReduce programs to import the data into HDFS and to apply design patterns such as filtering, partitioning, or job chaining, I recommend that you use Sqoop to transfer the data. Sqoop uses configuration files that are faster to deploy and faster to make updates to in a production environment.

The first step is to install Sqoop. Use the Apache Sqoop documentation for information about how to install Sqoop in your Hadoop cluster.

After Sqoop is installed, use this simple line of code to import data into your HDFS, specifying your own JDBC connection information for your RDBMS database:

sqoop import --username appuser --password apppassword 
--connect jdbc:mysql://server/banking_app 
--table transactions

Of course, the credentials can be stored in a file and referenced on the –options-file parameter instead. The jdbc:mysql://server/banking_app is the database URL for the JDBC connection that I'm using in the loan application use case to demonstrate this Hadoop environment.

With the Sqoop implementation, data is imported into HDFS from RDBMS at intervals, because there are large amounts of data. If this part of the system was designed to run with real-time operations, users would have to wait too long for an operation or request to be run. Also, because you don't want to import repeated data or stale data, you can design the jobs to pull data using a time-based criteria. That is, the job fetches only the most recent data from the RDBMS data in the banking application.

In the MapReduce design, the Oozie Workflow Engine allows your MapReduce programs to run as scheduled jobs at specified intervals. This implementation is shown in the Oozie Workflow configuration file in Listing 1.

Listing 1. Oozie workflow configuration file
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processData'>
<start to='loadTrxns' />
<action name="loadTrxns">
<map-reduce>
<job-tracker>${jobtracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.mappers.DataIngressMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>com.reducers.DataIngressReducer</value>
</property>
.
.
.
.
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Error</message>
</kill>
</start>
<end name="end"/>
</workflow>

Sometimes, though, to achieve a higher level of automation and effectiveness, you might need to run the workflow job (as configured in the Oozie configuration file in Listing 1) based on parameters such as time intervals, data availability, or external events. The Oozie coordinator configuration file (as configured in Listing 2) allows us to initiate a workflow job based on these parameters.

Listing 2. Oozie coordinator configuration file
<coordinator-app name="schedule_dataIngress" frequency="${frequency}" 
start="${start}" end="${end}" timezone="${timezone}" 
xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workFlowpath} </app-path>
<configuration>
<property>
<name>queueName</name>
<value>default</value>
</property>
</configuration>
</workflow>
</action>
</cordinator-app>

Modifying the configuration files for Sqoop

You can use these two configuration files if you are using MapReduce programs, but you need to make the following changes shown in Listing 3 to the Oozie Workflow configuration file if you're using Sqoop:

Listing 3. Oozie configuration file with changes for Sqoop
<action name="loadTrxnsSqoop">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobtracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost:3333/banking_app 
--query select * from transactions t join accounts a on t.account_id = 
a.id --target-dir hdfs://localhost:8020/data/loanTrxns -m 1</command>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>

Analyzing data in our Hadoop environment

Now that you have imported data into the Hadoop cluster, you can perform data analytics on it. Usually, a data scientist uses various analytics tools or applications. In my loan application use case, I will write a simple analytics program. Although Pig and Hive are popular tools to do data analytics, I recommend that you use R. Here's why:

  • Unlike with Pig and Hive, where you have to write UDF functions to implement machine learning algorithms, in R you can use built-in R functions to fit machine learning algorithm models to our data. With these functions, the program can observe and learn the patterns in the data, gain insights about the data, and then the application can decide what actions to take. So, in my loan application use case, by observing the trends in the financial transactions data of the applicant, the application can determine if the applicant is eligible for the loan or not, instead of the loan officer manually analyzing and making the decision.
  • By using RHadoop, you can easily integrate the powerful but simple data analysis capabilities of R on top of the storage and parallelism capabilities of Hadoop. And, if you write the front-end data analysis application in Java, you can use the mature rJava (JRI) interface that integrates R with Java seamlessly so you can call your R functions from within your Java applications.

In my loan application use case, because the loan applicants need to be grouped into those who defaulted on their loans (defaulters) and those who paid their loans (non-defaulters), I will design the application as a Binary classification and I will implement supervised learning. And, because the data needs to be grouped by using multiple parameters, I will implement the logistic regression algorithm. The input data will likely contain parameters or features for loan amounts, monthly deductions, collected loan before, completed payments, has defaulted, and so on. I can get rid of features that can be derived from other features so that I don't have too many features that can lead to overfitting, which is the unintentional modeling of chance variations in data, leading to models that do not work well when applied to other data sets. So, in this use case, I might implement the loanAppScript.R script in RHadoop, and it might look like the script in Listing 4.

Listing 4. Example RHadoop script for the loan application
Sys.setenv(HADOOP_HOME="/home/hadoop")
Sys.setenv(HADOOP_CMD="/home/hadoop/bin/hadoop")
library(rhdfs)
library(rmr2)
hdfs.init()
loanData <- from.hdfs("/data/tmp/loanData23062014")
loanAppModel <- glm(eligible ~ F1 + F2 + F3 + F4 +.....+ F7, data=loanData,family="binomial")
predict(loanAppModel,testData,type="response")

In this script, F1,.......,F7 are the features of the loanData that were retrieved from the Hadoop cluster after the ingress operation, and testData is the sample data that includes a loan applicant's details that includes the values of the features. When this script is run, the result is either 0 or 1, which specifies the loan applicant is either a defaulter or non-defaulter. A probability value is also returned. Further analysis values such as odds ratio, model coefficients, and confidence intervals can then be carried out to determine our level of model accuracy and to determine the level of impact each feature has on the outcome of the analysis.

Next, I need to integrate the R script into the J2EE banking application by using the rJava (JRI) framework. However, because I am calling an R script file, the code might look something like the Java code in Listing 5.

Listing 5. Java code for integrating our RHadoop script into the J2EE application
public void predictLoanApp() {
Rengine engine = Rengine.getMainEngine();
ClassPathResource rScript = new ClassPathResource("loanAppScript.R");
REXP result = rengine.eval(String.format("source('%s')",rScript.getFile().getAbsolutePath()));
Double predictedValue = result.asDouble();
}

Also, because I am passing parameters that represent test data into the function, the Java function call will look like the Java code in Listing 6.

Listing 6. Java function for passing parameters
	public void predictLoanApp(Object obj) {
Rengine engine = Rengine.getMainEngine();

ClassPathResource rScript = new ClassPathResource("loanAppScript.R");
rengine.eval(String.format("source('%s')",rScript.getFile().getAbsolutePath()));

rengine.eval(String.eval("testData <- data.frame(F1=%s,F2=%s,....F7=%s)",
obj.getX1(), obj.getX2(),....,obj.getX7())");

REXP result = rengine.eval(String.eval("predict(loanAppModel,testData)");
Double predictedValue = result.asDouble();
}

The obj object is a plain old Java object (POJO) that has the values for the features to be tested. With a Java function like this one, it can be plugged into our existing J2EE application at the service layer and can be used by any of the views technologies that are available in J2EE, such as JSP or JSF. To ensure the security of the data, use the native J2EE security mechanisms, such as JAAS, and for data confidentiality, use standard SSL protocols.

Lastly, to handle requests that come in to the J2EE application, use Flume, which is one of the tools in the Hadoop environment. Here, I configure a Flume event to consist of an agent that uses the J2EE application server log as its source and the HDFS as its sink. The Flume configuration file might look like the snippet in Listing 7.

Listing 7. Flume configuration file
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /opt/jboss/standalone/log/server.log
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:8020/data/tmp/system.log/
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel that buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Then, I can then start the agent by using a command similar to the following command:

bin/flume-ng agent --conf conf --conf-file loanApp.conf --name a1 -Dflume.root.logger=INFO,console

Conclusion

While this tutorial focused on one set of data in one Hadoop cluster, you can configure the Hadoop cluster to be one unit of a larger system, connecting it to other systems of Hadoop clusters or even connecting it to the cloud for broader interactions and greater data access. With this big data infrastructure, and its data analytics application, the learning rate of the implemented algorithms will increase with more access to data at rest or to streaming data, which will allow you to make better, quicker, and more accurate business decisions.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics
ArticleID=1002755
ArticleTitle=Integrate Hadoop with an existing RDBMS
publish-date=04132015