内容


将 Hadoop 与现有的 RDBMS 相集成

一种设计方法

Comments

Apache™ Hadoop® 环境提供了一些组件,比如 MapReduce、HDFS、Pig、Hive、Oozie、Sqoop 和 Flume,等等。这些 Hadoop 工具和技术是为了处理大数据而设计的。在将数据导入 Hadoop 后,您可以使用诸如 Jaql 和 R 之类的工具来分析和使用这些数据。

为了给如何集成 Hadoop(特别是 Hadoop 分布式文件系统,即 HDFS)与传统关系数据库管理系统 (RDBMS) 定义一个高级系统设计,我使用了一个贷款申请系统作为一个案例。在这个贷款申请过程中,贷款申请人接洽了银行中的一名信贷员来申请贷款。贷款申请人提供了银行资料,信贷员使用这些资料在银行应用程序系统中检查银行帐户上的活动。基于这个人工评估,信贷员决定批准或拒绝贷款申请。如果获得批准,申请人应向银行提交贷款申请。贷款申请会通过一系列的审批,直到最后获得批准,贷款被支付给申请人。

通常,对大数据的预测和说明性分析是过程密集型操作。这些过程密集型操作应该对 HDFS 集群的 Hadoop 中的数据运行(而不是对 RDBMS 中的数据运行)。因为 HDFS 的存储和并行处理能力,而且因为 Hadoop 集群可以采用最佳方式在许多不同的现有可用硬件系统上运行,所以这种集成式设计:

  • 具有良好的可扩展性
  • 提供了成本更低的存储选项
  • 可以提高我们的大数据环境的整体性能

因为 Hadoop 可以存储更多的数据,而且可以存储更多类型的数据(结构化和非结构化数据),所以数据分析会更智能,并最终帮助我们制定更好的业务决策。

设置一个 Hadoop 集群

使用您正在使用的 Hadoop 解决方案中的可用信息来设置您的 Hadoop 集群。一些众所周知的标准解决方案包括:来自 IBM 的 IBM Open Platform with IBM BigInsights for Apache Hadoop、来自 Cloudera 的 CDH,以及来自 Hortonworks 的 Hortonworks Data Platform (HDP) 。此外,还可以使用参考资料中列出的书籍。

回顾 IBM developerWorks 文章 "大数据架构和模式" 系列,该系列介绍了一种结构化的、基于模式的方法来简化定义整体大数据架构的任务。参见有关大数据的博客和社区,获得更多的想法和支持。

根据我的经验和研究,这些提示和技巧可以帮助您建立一个安全的、可扩展的 Hadoop 集群:

  • 在设置 Hadoop 架构之前总是创建一个专用的 Hadoop 用户帐户。该帐户将 Hadoop 安装与同一台机器上运行的其他服务分离。
  • 避免在您的配置中使用 RAID(独立磁盘冗余阵列)的原因如下:
    • RAID 提供的冗余是不必要的,因为 HDFS 默认情况下为节点之间的复制提供了冗余。
    • RAID 配置中的磁盘故障会影响整个磁盘阵列,导致节点不可用。然而,在 HDFS 中,节点将继续执行操作,不受故障磁盘影响。
    • 在 RAID 系统中,读写操作的速度受到阵列中最慢磁盘的速度的限制。但是,HDFS 使用了 JBOD(只是一群磁盘)配置,该配置中的磁盘操作是单独的,所以平均速度要高于最慢的磁盘。
  • 计算您需要的数据存储量:用节点数量乘以您在给定时间内预计将会收集的数据的大小。
  • 在多个机架的 Hadoop 集群中获得最大性能,通过使用 topology.node.switch.mapping.impl 配置属性将您的节点映射到这些机架。
  • 确保主节点(namenode)上有足够的内存。主节点通常具有较高的内存需求,因为它将整个名称空间的文件和数据块元数据都保存在内存中。次级主节点拥有主节点的副本或镜像。
  • 虽然您可以将主节点和次级主节点放在同一个节点上,但某些场景要求您将它们放在单独的节点上。例如,将它们放在单独的节点来帮助您恢复主节点丢失(或损坏)的所有元数据文件。
  • 将 jobtracker 放在一个专用的节点上,因为 jobtracker 运行的 MapReduce 作业可能在增加。
  • 当主节点和 jobtracker 在不同的节点上时,同步它们的从属文件,因为集群中的每个节点都需要运行一个 Datanode 和一个 Tasktracker。
  • 检查主节点和 jobtracker 的审计日志及资源管理器。HDFS 有自己的审计日志。
  • 要控制执行 HDFS 操作期间的记忆痕迹和 CPU 使用率,可以设置以下属性:
    • mapred.tasktracker.map.tasks.maximum:该属性控制同一时间运行在 tasktracker 上的 Map 任务的数量。
    • mapred.tasktracker.reducer.tasks.maximum:该属性控制同一时间运行在 tasktracker 上的 Reduce 任务的数量。
    • mapred.child.java.opts:该属性定义了分配给这些 Map 任务和 Reduce 任务生成的每个子 JVM 的内存数量。
  • 对于不会频繁更新的文件,可以使用 DistributedCache 工具在 MapReduce 程序中缓存这些文件。
  • 您还可以在 hadoop-env.sh 脚本中使用 HADOOP_NAMENODE_OPTS 和 HADOOP_SECONDARYNAMENODE_OPTS 参数来调整主节点和次要主节点的内存需求。
  • 在您的 Hadoop 集群中配置一个安全架构,包括以下这些工具或技术:
    • 默认情况下,Hadoop 使用 whoami 工具(在大多数 UNIX 操作系统中可用)来实现授权和身份验证。不幸的是,这个工具很容易被知道操作系统凭证的任何用户模拟。不过,为了提高您的 Hadoop 集群的安全性,可以配置您的 Hadoop 集群来使用 Kerberos 身份验证机制。同时,还需要使用安全令牌来补充基本的 Kerberos 身份验证机制。
    • 确保 Kerberos 是在 Hadoop 集群中实现的。Kerberos 在客户端(我们的 J2EE 应用程序)与 Hadoop 集群之间启动和建立握手协议。对服务、文件访问和作业的后续调用是通过安全令牌(委托令牌、块访问令牌和作业令牌)来安全处理的。这种方法还可以减少已创建的任务将在 Kerberos 密钥分发中心拥有的负载,如果该负载是在每个任务一个令牌的基础上生成的。
    • 设置 Oozie 工作流引擎。Oozie 工作流作业需要建立可靠的令牌,以便可以代表启动该工作流的用户来运行作业。
    • 配置 Spengo,它包含在 Hadoop 中,用于对 Web 接口(比如 Hue)进行身份验证。为了实现 Hadoop 中的保密性,数据通过使用内置的压缩格式进行了伪加密(pseudo-encrypted)。

    关于在您的 Hadoop 集群中实现安全性的更多信息,请参阅 "增加 Apache Hadoop 项目的安全性" 技术报告。

  • 在设置您的 Hadoop 集群后对其进行基准测试。

将数据移动到您的 Hadoop 分布式文件系统 (HDFS)

现在,我们已经建立了 Hadoop 集群,是时候来导入数据了。在贷款申请用例中,需要将 Hadoop 集群与现有银行应用程序相集成,该应用程序使用 RDBMS 作为它的数据存储。接下来,必须在新的和现有的 RDBMS 之间移动 HDFS 数据。HDFS 可以取代数据仓库,或者充当结构化和非结构化数据与现有数据仓库之间的桥梁。

根据我的经验和研究,以下这些提示和技巧可以帮助您建立您的 HDFS:

  • HDFS 中的某个操作应该无论运行多少次都会产生相同的结果。
  • 在将数据移动到 HDFS 中之前,汇集这些数据,以便 MapReduce 程序使用最少量的内存来转换数据,尤其在主节点上时。
  • 在将数据移动到 HDFS 中之前,将它从一种格式转换为另一种格式,使其适用于目标系统。
  • 实现故障转移,这样就可以在操作失败时再次尝试,例如在一个节点不可用时。
  • 确认在通过网络传输数据时数据没有被损坏。
  • 控制所执行的并行操作的数量,限制运行的 MapReduce 程序的数量。这两种功能都会影响资源使用和性能。
  • 监视您的操作,确保取得成功并产生预期的结果。

尽管可以使用 MapReduce 程序将数据导入到 HDFS 中,还可以使用 MapReduce 程序来应用设计模式,比如过滤、分区或作业衔接,我建议您使用 Sqoop 来传输数据。Sqoop 使用了一些配置文件,这些文件能够更快地部署,更快地更新到一个生产环境中。

第一步是安装 Sqoop。使用 Apache Sqoop 文档 来获得有关如何在您的 Hadoop 集群中安装 Sqoop 的信息。

在安装 Sqoop 之后,可以使用这个简单的代码行将数据导入您的 HDFS,为 RDBMS 数据库指定您自己的 JDBC 连接信息:

sqoop import --username appuser --password apppassword 
--connect jdbc:mysql://server/banking_app 
--table transactions

当然,凭证可以存储在一个文件中,并使用 –options-file 参数进行引用。jdbc:mysql://server/banking_app 是用来实现 JDBC 连接的数据库 URL,我们将在贷款申请用例中使用它来演示这个 Hadoop 环境。

借助 Sqoop 实现,可以将数据按一定的时间间隔从 RDBMS 导入到 HDFS 中,因为有大量的数据。如果系统的这一部分被设计用于实时操作,那么用户必须等待很长时间来运行一个操作或请求。另外,因为您不想导入重复的数据或陈旧的数据,所以您可以使用一个基于时间的标准来设计提取数据的作业。也就是说,该作业只从银行应用程序的 RDBMS 数据中获取最新数据。

在 MapReduce 设计中,Oozie 工作流引擎允许您的 MapReduce 程序按照指定时间间隔如期运行作业。清单 1 中的 Oozie Workflow 配置文件中显示了这个实现。

清单 1. Oozie 工作流配置文件
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processData'>
<start to='loadTrxns' />
<action name="loadTrxns">
<map-reduce>
<job-tracker>${jobtracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.mappers.DataIngressMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>com.reducers.DataIngressReducer</value>
</property>
.
.
.
.
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Error</message>
</kill>
</start>
<end name="end"/>
</workflow>

有时候,为了实现更高级别的自动化和有效性,您可能需要基于参数运行工作流作业(正如 清单 1 中的 Oozie 配置文件中所配置的那样),这些参数包括时间间隔、数据可用性或外部事件。Oozie 协调器配置文件(正如 清单 1 中所配置的那样)允许我们基于这些参数启动一个工作流作业。

清单 2. Oozie 协调器配置文件
<coordinator-app name="schedule_dataIngress" frequency="${frequency}" 
start="${start}" end="${end}" timezone="${timezone}" 
xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workFlowpath} </app-path>
<configuration>
<property>
<name>queueName</name>
<value>default</value>
</property>
</configuration>
</workflow>
</action>
</cordinator-app>

修改 Sqoop 的配置文件

如果正在使用 MapReduce 程序,那么可以使用这两个配置文件,但是,如果正在使用 Sqoop,则需要对 Oozie 工作流配置文件执行 清单 3 中所示的以下修改:

清单 3. 包含针对 Sqoop 的更改的 Oozie 配置文件
<action name="loadTrxnsSqoop">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobtracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost:3333/banking_app 
--query select * from transactions t join accounts a on t.account_id = 
a.id --target-dir hdfs://localhost:8020/data/loanTrxns -m 1</command>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>

分析我们的 Hadoop 环境中的数据

现在,您已经将数据导入到 Hadoop 集群中,您可以对这些数据进行数据分析。通常,数据科学家会使用各种分析工具或应用程序。在我的贷款申请用例中,我将编写一个简单的分析程序。尽管 Pig 和 Hive 是执行数据分析的流行工具,但我建议您使用 R。原因如下:

  • 与 Pig 和 Hive 不同,您必须编写 UDF 函数来实现机器学习算法,在 R 中,您可以使用内置函数让机器学习算法模型适用于我们的数据。借助这些函数,程序可以观察和学习数据中的模式,获得有关数据的洞察,然后应用程序可以决定应采取的行动。在我的贷款申请用例中,通过观察申请人的金融交易数据中的趋势,应用程序可以确定申请人是否有资格获得贷款,而不是通过信贷员手动分析和制定决策。
  • 通过使用 RHadoop,您可以轻松地集成 R 在存储方面的功能:强大而又简单的数据分析功能和 Hadoop 的并行处理功能。如果您用 Java 编写前端数据分析应用程序,那么您可以使用成熟的 rJava (JRI) 接口,该接口将 R 与 Java 无缝集成在一起,这样您就可以在您的 Java 应用程序中调用 R 函数。

在我的贷款申请用例中,因为贷款申请者需要划分为拖欠贷款的人(拖欠者)和已支付贷款的人(非拖欠者),所以我将该应用程序设计为一个二进制分类应用程序,而且我将实施监督学习。此外,因为需要使用多个参数对数据进行分组,我将实现逻辑回归算法。输入数据可能包含一些针对贷款金额、每月扣除款、以前收集的贷款、已完成支付、违约等款项的参数或特性。我可以丢弃来自其他特性的一些特性,所以我没有提供太多的特性,这些特性可能会导致过度拟合(overfitting),过度拟合是数据中随机变异的无意识建模,会导致这些模型在应用于其他数据集时无法很好地工作。所以,在此用例中,我可能会在 RHadoop 中实现 loanAppScript.R 脚本,它看起来可能类似于 清单 4 中的脚本。

清单 4. 贷款应用程序的示例 RHadoop 脚本
Sys.setenv(HADOOP_HOME="/home/hadoop")
Sys.setenv(HADOOP_CMD="/home/hadoop/bin/hadoop")
library(rhdfs)
library(rmr2)
hdfs.init()
loanData <- from.hdfs("/data/tmp/loanData23062014")
loanAppModel <- glm(eligible ~ F1 + F2 + F3 + F4 +.....+ F7, data=loanData,family="binomial")
predict(loanAppModel,testData,type="response")

在此脚本中,F1,.......,F7loanData 的特性,是在执行 ingress 操作后从 Hadoop 集群中获得的,testData 是示例数据,提供了贷款申请人的详细信息,包括特性的值。在运行此脚本时,结果要么是 0 要么是 1,它指定了贷款申请人是拖欠者还是非拖欠者。该脚本还返回了一个概率值。进一步分析这些值,比如比值比(odds ratio)、模型系数和置信区间,然后可以计算这些值,确定我们的模型准确性级别,以及每个特性对分析结果的影响级别。

接下来,我需要使用 rJava (JRI) 框架将 R 脚本集成到 J2EE 银行应用程序中。不过,因为我调用了一个 R 脚本文件,该代码看起来可能类似于 清单 5 中的 Java 代码。

清单 5. 用于将我们的 RHadoop 脚本集成到 J2EE 应用程序中的 Java 代码
public void predictLoanApp() {
Rengine engine = Rengine.getMainEngine();
ClassPathResource rScript = new ClassPathResource("loanAppScript.R");
REXP result = rengine.eval(String.format("source('%s')",rScript.getFile().getAbsolutePath()));
Double predictedValue = result.asDouble();
}

此外,因为我将一些表示测试数据的参数传递到了函数中,所以 Java 函数调用看起来类似于 清单 6 中的 Java 代码。

清单 6. 用于传递参数的 Java 函数
	public void predictLoanApp(Object obj) {
Rengine engine = Rengine.getMainEngine();

ClassPathResource rScript = new ClassPathResource("loanAppScript.R");
rengine.eval(String.format("source('%s')",rScript.getFile().getAbsolutePath()));

rengine.eval(String.eval("testData <- data.frame(F1=%s,F2=%s,....F7=%s)",
obj.getX1(), obj.getX2(),....,obj.getX7())");

REXP result = rengine.eval(String.eval("predict(loanAppModel,testData)");
Double predictedValue = result.asDouble();
}

obj 对象是一个传统的 Java 对象 (POJO),它有一些针对要测试的特性的值。借助 Java 函数(比如下面这个函数),可以将它插入服务层上的现有 J2EE 应用程序,还可以让 J2EE 中可用的任何视图技术(比如 JSP 或 JSF)使用它。为了确保数据的安全性,应使用本地 J2EE 安全机制,比如 JAAS,为了实现数据保密性,可以使用标准的 SSL 协议。

最后,为了处理传入到 J2EE 应用程序中的请求,可以使用 Flume,Flume 是 Hadoop 环境中的一个工具。在这里,我配置了一个 Flume 事件来构成一个代理,使用 J2EE 应用程序服务器日志作为其来源,使用 HDFS 作为其系统。Flume 配置文件看起来可能类似于 清单 7 中的代码片段。

清单 7. Flume 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /opt/jboss/standalone/log/server.log
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:8020/data/tmp/system.log/
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel that buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然后,我可以使用类似以下命令的一个命令来启动代理:

bin/flume-ng agent --conf conf --conf-file loanApp.conf --name a1 -Dflume.root.logger=INFO,console

结束语

虽然本教程重点介绍了一个 Hadoop 集群中的一组数据,但您可以将该 Hadoop 集群配置成一个更大的系统,将它连接到 Hadoop 集群的其他系统,甚至将它连接到云来实现更广泛的交互和更多的数据访问。在使用这种大数据基础架构时,其数据分析应用程序、已实现算法的学习速率会增加对空闲数据的访问,或加快数据处理,帮助您更好、更快、更准确地制定商业决策。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Big data and analytics, Information Management
ArticleID=1007310
ArticleTitle=将 Hadoop 与现有的 RDBMS 相集成
publish-date=06012015