内容


将 Informix 数据引入到 Spark 中,第 4 部分

利用其他数据源的数据

添加外部数据源,获取更多洞察!

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分: 将 Informix 数据引入到 Spark 中,第 4 部分

敬请期待该系列的后续内容。

此内容是该系列的一部分:将 Informix 数据引入到 Spark 中,第 4 部分

敬请期待该系列的后续内容。

我们身在何处?我们将去往何处?

在本系列的前 3 部分中,您学习了如何:

您可能认为,您可以对传统关系数据库执行您目前完成的所有操作。完全正确。现在,是时候通过添加另外两个数据源来发现 Spark 的一些功能了。

为什么需要更多数据?

昨天早上,在庆祝国际咖啡日时,您无意中听到销售副总裁与新销售人员的对话。他们在讨论让销售人员赚得盆满钵满的销售区,以及该区域的零售店。顺便提醒一下,您的公司向全美国的零售店批发体育设备。尽管您没有完全理解他们的销售行话,但您对他们的数据知识感到非常失望。您不得不打断他们:“我可以分析销售数据,将其与每个邮政区的平均收入和此区域的人口规模交叉分析,然后可以了解哪个邮政区需要更多关注。”

您知道您会通过这番话交到一些新朋友。现在您需要提供一些分析数据。

甚至需要更多的数据!

您的第一反应是访问 IBM 的 developerWorks,这很不错。但是,您需要两个额外的数据集:每个邮政区的人口和收入。

要获得这些数据集,可以求助两个美国政府部门:美国人口统计局国内收入署 (IRS)。针对本教程的用途,我们使用了人口统计局的一个经过整理的版本,因为原始数据有点难以理解。

对 IRS 数据进行试验

对于 IRS 数据,第一个练习是使用原始 IRS 数据集,查找您的销售区域中收入高于 75,000 美元/年的预定义收入的家庭数量。

可以从 GitHub 下载代码和数据。对于本部分,实验数据包含在 net.jgp.labs.informix2spark.l4xx 包中。数据包含在 data 目录中。IRS 提供了该数据集的技术解释。解释文件的名称为 14zpdoc.doc,该文件包含在存储库的 data 目录中。

基本上讲,每个区域都是通过其邮政编码来定义的。每个区域根据营业额等级划分为 6 个调整后总收入 (AGI) 组:

  1. 低于 25,000 美元
  2. 25,000 美元至 50,000 美元之间
  3. 50,000 美元至 75,000 美元之间
  4. 75,000 美元至 100,000 美元之间
  5. 100,000 美元至 200,000 美元之间
  6. 高于 200,000 美元

本教程的目标是 3 个收益较高的组。

我们提供了所有代码,让您有一个全局视图。

package net.jgp.labs.informix2spark.l400;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class HouseholdsAboveMedianRevenuePerZipApp {

  public static void main(String[] args) {
    HouseholdsAboveMedianRevenuePerZipApp app = new HouseholdsAboveMedianRevenuePerZipApp();

传递要分析的区域的邮政编码来运行该应用程序。

    app.start(27514);
  }

  private void start(int zip) {

在本地模式下创建一个 Spark 会话。

    SparkSession spark = SparkSession
        .builder()
        .appName("Number of households in your ZIP Code")
        .master("local")
        .getOrCreate();

将 CSV 文件加载到 Spark 中很简单:使用 format 方法和 csv 参数。CSV 加载器接受许多选项(众所周知,CSV 很难处理)。IRS 文件遵循一种非常简单的方案,其中使用了两个选项:

  • 文件中的第一行是标题
  • 让 Spark 对模式进行推断,使其能够确定您将要使用哪些数据类型。

如果没有让 Spark 推断模式,则会将此数据帧中的所有列都视为字符串。CSV 解析还有其他许多选项,每个选项都已在我的博客中进行了解释。

最后请注意,文件名使用了一个通配符。Spark 将所有与 14zpallagi*.csv 匹配的文件都加载到 data 目录中。可以在这里使用正则表达式:

  • 14zpallagi*.csv 读取所有以 14zpallagi 开头并以 .csv 结尾的文件,
  • 14zpallagi-part[1-3].csv 读取:14zpallagi-part1.csv14zpallagi-part2.csv14zpallagi-part3.csv
    String filename = "data/14zpallagi*.csv";
    Dataset<Row> df = spark
        .read()
        .format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load(filename);

现在可以检查已加载的数据:

    df.printSchema();

首先查看长模式。该模式之所以很长是因为 IRS 慷慨地共享了 127 个列(我删除了其中的许多列)。

root
 |-- STATEFIPS: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- agi_stub: integer (nullable = true)
 |-- N1: double (nullable = true)
 |-- mars1: double (nullable = true)
 |-- MARS2: double (nullable = true)
 |-- MARS4: double (nullable = true)
 |-- PREP: double (nullable = true)
 |-- N2: double (nullable = true)
 |-- NUMDEP: double (nullable = true)
 |-- TOTAL_VITA: double (nullable = true)
 |-- VITA: double (nullable = true)
 |-- TCE: double (nullable = true)
 |-- A00100: double (nullable = true)
 |-- N02650: double (nullable = true)
…

查看一个数据样本sample() 方法最多接受 3 个参数:替换性(或记录独立性)、一个小数和一个种子(可选)。

    df.sample(true, 0.01, 4589).show(2);
    System.out.println("Dataframe has " + df.count() + " rows and " + df.columns().length
        + " columns.");

我将输出限制为两行!

+---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
|STATEFIPS|STATE|zipcode|agi_stub|     N1| mars1|  MARS2|MARS4|   PREP|      N2| NUMDEP|TOTAL_VITA|VITA|TCE|     A00100| N02650|     A02650| N00200|     A00200| N00300|  A00300| N00600|  A00600| N00650|  A00650| N00700| A00700| N00900|  A00900| N01000|   A01000|N01400|  A01400| N01700|  A01700|  SCHF|N02300|A02300| N02500|  A02500| N26270|   A26270| N02900|  A02900|N03220|A03220|N03300| A03300|N03270| A03270|N03150| A03150|N03210|A03210|N03230|A03230|N03240| A03240| N04470|   A04470|     A00101| N18425|  A18425|N18450|A18450| N18500|  A18500| N18300|  A18300| N19300|  A19300| N19700|  A19700| N04800|     A04800| N05800|   A05800| N09600| A09600|N05780|A05780| N07100| A07100| N07300| A07300|N07180|A07180|N07230|A07230|N07240|A07240|N07220|A07220|N07260|A07260| N09400|  A09400|N85770|A85770|N85775|A85775|N09750|A09750| N10600|   A10600|N59660|A59660|N59720|A59720|N11070|A11070|N10960|A10960|N11560|A11560| N06500|   A06500| N10300|   A10300| N85530| A85530| N85300| A85300| N11901|  A11901| N11902|  A11902|
+---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
|        1|   AL|      0|       6|51270.0|4160.0|45860.0|910.0|39250.0|148020.0|50820.0|      80.0|80.0|0.0|2.2652783E7|51270.0|2.3073614E7|44660.0|1.1051815E7|40610.0|264670.0|32640.0|791136.0|31530.0|620013.0|29090.0|96411.0|13720.0|855431.0|31500.0|2331256.0|9340.0|444369.0|14820.0|833058.0|2630.0| 520.0|1842.0|11220.0|292923.0|22620.0|4633716.0|22380.0|420831.0|1080.0| 267.0|2770.0|86011.0|8520.0|92498.0|1140.0|10827.0|   0.0|   0.0|   0.0|   0.0|1660.0|61679.0|47840.0|2499208.0|2.1437993E7|45930.0|761404.0|1440.0|4867.0|45630.0|168515.0|47820.0|962132.0|36310.0|456194.0|45490.0|920293.0|51240.0|1.9692443E7|51230.0|5274998.0|15590.0|77387.0|   0.0|   0.0|21020.0|54852.0|15290.0|25542.0|3230.0|1747.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|1410.0| 459.0|14770.0|124954.0|   0.0|   0.0|   0.0|   0.0| 100.0| 219.0|50690.0|5185930.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|51170.0|5204318.0|51230.0|5472274.0|19090.0|33930.0|25450.0|89886.0|28340.0|774418.0|15560.0|243494.0|
|        1|   AL|  35004|       5|  590.0|  40.0|  530.0|  0.0|  300.0|  1660.0|  550.0|       0.0| 0.0|0.0|    74554.0|  590.0|    75493.0|  560.0|    64835.0|  260.0|   150.0|  140.0|   236.0|  130.0|   149.0|  350.0|  310.0|  100.0|  1671.0|  100.0|    364.0|  60.0|  1459.0|  150.0|  3991.0|   0.0|   0.0|   0.0|   90.0|  1796.0|   40.0|   1408.0|  240.0|   939.0|  30.0|   8.0|   0.0|    0.0|   0.0|    0.0|   0.0|    0.0| 130.0| 132.0|   0.0|   0.0|   0.0|    0.0|  450.0|   9296.0|    57663.0|  430.0|  2268.0|   0.0|   0.0|  420.0|   353.0|  450.0|  2766.0|  400.0|  2900.0|  420.0|  2321.0|  590.0|    56931.0|  590.0|   9612.0|    0.0|    0.0|   0.0|   0.0|  310.0|  448.0|   40.0|    3.0| 120.0|  66.0|  70.0|  99.0|   0.0|   0.0| 190.0| 249.0|   0.0|   0.0|   60.0|   228.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|  590.0|  10176.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|  60.0|  59.0|   0.0|   0.0|  580.0|   9179.0|  580.0|   9419.0|    0.0|    0.0|    0.0|    0.0|  200.0|   625.0|  380.0|  1553.0|
+---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
only showing top 2 rows
The dataframe has 166719 rows and 127 columns.

您不需要清理您的数据集,但我发现这样做更便于阅读。要清理您的数据集,可以按邮政编码进行过滤并丢弃额外的列。

    Dataset<Row> df2 = df.filter(df.col("zipcode").equalTo(zip));
    String[] colsToDrop = { "STATEFIPS", "mars1", "MARS2", "MARS4", "PREP", "N2",
        "NUMDEP", "TOTAL_VITA", "VITA", "TCE", "A00100", "N02650", "N00200", "A00200",
        "N00300", "A00300", "N00600", "A00600", "N00650", "A00650", "N00700", "A00700",
        "N00900", "A00900", "N01000", "A01000", "N01400", "A01400", "N01700", "A01700",
        "SCHF", "N02300", "A02300", "N02500", "A02500", "N26270", "A26270", "N02900",
        "A02900", "N03220", "A03220", "N03300", "A03300", "N03270", "A03270", "N03150",
        "A03150", "N03210", "A03210", "N03230", "A03230", "N03240", "A03240", "N04470",
        "A04470", "A00101", "N18425", "A18425", "N18450", "A18450", "N18500", "A18500",
        "N18300", "A18300", "N19300", "A19300", "N19700", "A19700", "N04800", "A04800",
        "N05800", "A05800", "N09600", "A09600", "N05780", "A05780", "N07100", "A07100",
        "N07300", "A07300", "N07180", "A07180", "N07230", "A07230", "N07240", "A07240",
        "N07220", "A07220", "N07260", "A07260", "N09400", "A09400", "N85770", "A85770",
        "N85775", "A85775", "N09750", "A09750", "N10600", "A10600", "N59660", "A59660",
        "N59720", "A59720", "N11070", "A11070", "N10960", "A10960", "N11560", "A11560",
        "N06500", "A06500", "N10300", "A10300", "N85530", "A85530", "N85300", "A85300",
        "N11901", "A11901", "N11902", "A11902" };
    for (String colName : colsToDrop) {
      df2 = df2.drop(colName);
    }
    df2.printSchema();
    df2.show();
    System.out.println("Dataframe has " + df2.count() + " rows and " + df2
        .columns().length + " columns.");

现在来查看一下结果。您获得了更容易理解的结果。

root
 |-- STATE: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- agi_stub: integer (nullable = true)
 |-- N1: double (nullable = true)
 |-- A02650: double (nullable = true)

+-----+-------+--------+------+--------+
|STATE|zipcode|agi_stub|    N1|  A02650|
+-----+-------+--------+------+--------+
|   NC|  27514|       1|3590.0| 42542.0|
|   NC|  27514|       2|2030.0| 74332.0|
|   NC|  27514|       3|1040.0| 65651.0|
|   NC|  27514|       4| 800.0| 71410.0|
|   NC|  27514|       5|1690.0|249042.0|
|   NC|  27514|       6|1650.0|843353.0|
+-----+-------+--------+------+--------+
Dataframe has 6 rows and 5 columns.

对于这个较小的数据集,可以看到您关心的是 agi_stub 大于 3 的记录。您想要对它们进行统计,按邮政编码进行分组,然后对列 N1 中返回的数量进行求和。Spark 代码为:

    Dataset<Row> df3 = df2.filter(df2.col("agi_stub").$greater(3));
    df3 = df3.groupBy("zipcode").sum("N1").withColumnRenamed("sum(N1)", "households");
    df3.show();
  }
}

您会获得:

+-------+----------+
|zipcode|households|
+-------+----------+
|  27514|    4140.0|
+-------+----------+

这些结果是何含义?在这个邮政区中,4,140 笔税收是由收入高于 75,000 美元的人上缴的。税收笔数与家庭的比例不是 1:1,但它能让您看清销售潜力。

如何在 Spark 中实现此操作?

收入数据集的大小接近 200MB,而您将仅使用它的一小部分。 经过整理的人口统计数据只有 400KB。将此数据集加载到您主要用于销售和仓库交易的 Informix® 数据库中是否有意义?或许没有意义。不要误会,Informix 能完美地处理这些数据集,但这不是它的职责。

在您成长为数据科学家的路上,会逐渐添加越来越多的数据集,并对它们进行试验(而且可能使用 IBM Data Science Experience 等专用工具)。但是,您可能不希望每次试验都将所有这些数据集加载到生产数据库中。

回归正题

向销售人员展示了这些第一批结果后,你们一起发现了一个预测潜在收入的不错指标。

基本思路是:

  • 找到最佳销量区域。
  • 考虑采用一些关键数字作为参考:人口 (ref_pop)、您在这个区域的当前营业额 (ref_rev),以及收入 (ref_inc)。
  • 通过比较人口 (pop)、营业额 (rev) 和平均收益 (inc),调整每个邮政区的潜在收入。

备注:在本文的剩余部分中,index 指的是“指数”。不要将该指数与数据库索引混淆。

对于每个区域或邮政区,应用:

这很不错,但如何在 Spark 中实现此操作?

从业务到开发

可以在 net.jgp.labs.informix2spark.l420 包中找到一个示例,它是一个名为 SalesTargetApp 的应用程序。

初始化

您可以浏览一下该代码,先执行导入,然后执行初始化。

package net.jgp.labs.informix2spark.l420;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;

import java.math.BigDecimal;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;
import scala.collection.Seq;

public class SalesTargetApp {

  SparkSession spark;

  public SalesTargetApp() {
    init();
  }

  private void init() {
    this.spark = SparkSession
        .builder()
        .appName("Sales Target")
        .master("local")
        .getOrCreate();
  }

  public static void main(String[] args) {
    SalesTargetApp app = new SalesTargetApp();
    app.start();
  }

与前面的所有示例一样,所有示例都以 start() 方法开头。

  private void start() {
    Dataset<Row> householdDf = getHouseholdDataframe();
    Dataset<Row> populationDf = getPopulationDataframe();
    Dataset<Row> indexDf = joinHouseholdPopulation(householdDf, populationDf);
    Dataset<Row> salesDf = getSalesData();

您通过调用方法构建了 4 个数据帧。让我们详细分析一下它们。

家庭数据

此方法非常类似于本教程中对 IRS 数据执行的第一个试验。首先读取 CSV 文件。

  private Dataset<Row> getHouseholdDataframe() {
    String filename = "data/14zpallagi*.csv";
    Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true").load(filename);

像在 SQL 中一样,选择您感兴趣的列。这是丢弃您不想要的所有列的替代方法。

    df = df.select(
        df.col("zipcode"),
        df.col("agi_stub"),
        df.col("N1"),
        df.col("A02650"),

在最后一列中,您想要获得每组的所有税收的总收入。请记住,IRS 将该数据拆分为 6 组。

        df.col("N1").multiply(df.col("A02650")));

此操作在数据帧的末尾处创建了一个名为 (N1 * A02650) 的列。这不是一个描述性的列名称,所以您决定将它重命名为 incomedf.columns() 返回列名称的列表,df.columns()[df.columns().length - 1] 提供了最后一列的名称。

    df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "income");

因为您对分析每个邮政区的 AGI 不感兴趣,所以需要按邮政编码对不同的 AGI 类别进行分组。在执行此操作的过程中,还可以将所有收益添加到一个重命名为 total_income 的列中。

    df = df.groupBy("zipcode").sum("income");
    df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "total_income");

    return df;
  }

如果此刻显示了该数据帧,您会看到:

+-------+------------+
|zipcode|total_income|
+-------+------------+
|  35071| 4.7763307E8|
|  36525|   6306850.0|
|  36538|    201690.0|
|  85253|9.45181039E9|
|  85321|  1.290294E7|
+-------+------------+
only showing top 5 rows

人口

有了您目前获得的所有经验,加载人口数据帧就会很简单。

  private Dataset<Row> getPopulationDataframe() {
    String filename = "data/2010+Census+Population+By+Zipcode+(ZCTA).csv";
    Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load(filename);

还有另一种重命名列的方法,但采用该方法需要提供列名称(而且您需要确实拥有列名称)。

    df = df.withColumnRenamed("Zip Code ZCTA", "zipcode");
    df = df.withColumnRenamed("2010 Census Population", "pop");
    return df;
  }

您的数据帧类似于:

+-------+-----+
|zipcode|  pop|
+-------+-----+
|   1001|16769|
|   1002|29049|
|   1003|10372|
|   1005| 5079|
|   1007|14649|
+-------+-----+
only showing top 5 rows

联接家庭收益数据和人口数据

第 3 部分中学到的一样,采用一种外联接方式来联接该邮政区的两个数据集 - 这意味着可能会有一些 null 值。

  private Dataset<Row> joinHouseholdPopulation(
      Dataset<Row> householdDf,
      Dataset<Row> populationDf) {
    Dataset<Row> df = householdDf
        .join(
            populationDf,
            householdDf.col("zipcode").equalTo(populationDf.col("zipcode")),
            "outer")
        .drop(populationDf.col("zipcode"))

现在,创建一个名为 income_per_inh 的新列。此列包含按人口对总收益进行划分的结果。 它提供了每位居民的预计收益。

        .withColumn(
            "income_per_inh",
            householdDf.col("total_income").divide(populationDf.col("pop")));
    return df;
  }

withColumn() 支持使用一个或多个列和函数,在您的数据帧中创建一个新列。 随着您执行的转换越来越多,withColumn() 可能会成为您最喜欢的方法。使用该方法比尝试查找最后一个要重命名的列更容易。

以下是这个新数据帧现在的外观:

+-------+------------+-----+------------------+
|zipcode|total_income|  pop|    income_per_inh|
+-------+------------+-----+------------------+
|   1088|   1144910.0|  670|1708.8208955223881|
|   1238|  7.228838E7| 6047|11954.420373739043|
|   1342|   4992920.0| 1492| 3346.461126005362|
|   2122|1.09356174E9|23479| 46576.16338004174|
|   2142| 1.0935586E8| 3141| 34815.61922954473|
+-------+------------+-----+------------------+
only showing top 5 rows

销售数据

对于销售数据,重用您为本教程系列的第 3 部分编写的代码。您需要将它放在一个方法中。

  private Dataset<Row> getSalesData() {
…
    return salesDf;
  }

请记住,所有代码都已在 GitHub 上提供。

您的销售数据帧现在包含邮政编码和您获得的收入数据。

+-------+-------+
|zipcode|revenue|
+-------+-------+
|  94062|1390.00|
|  94040| 562.00|
|  94022| 448.00|
|  19898|1131.00|
|  74006|1614.00|
+-------+-------+
only showing top 5 rows

组合所有数据集,预测销量

您现在已拥有所有数据集,并且已准备好构建您与销售团队一起定义的指数。

    Dataset<Row> salesIndexDf = salesDf
        .join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left")
        .drop(indexDf.col("zipcode"));

查看人均收入。

    salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue")
        .divide(salesIndexDf.col("pop")));

现在识别最佳销量区域。

    salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());

提取“最佳行”。最佳行包含要供销售团队选择最佳区域时参考的所有值。

    Row bestRow = salesIndexDf.first();
    double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh"))
        .doubleValue();
    int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop");
    double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");

接下来,在您的数据帧中创建一列。您可以使用 withColumn() 方法。但是如果您想添加一个包含特定值的列,该怎么做?为此,可以接受一个数字列,按该数字列的值(您拥有值 1)对其进行拆分,然后将它与该值相乘。

    salesIndexDf = salesIndexDf.withColumn(
        "best_revenue_per_inh",
        salesIndexDf.col("pop").divide(salesIndexDf.col("pop"))
            .multiply(bestRevenuePerInhabitant));

或者可以使用 lit() 静态函数:

    salesIndexDf = salesIndexDf.withColumn(
        "pop_of_best",
        lit(populationOfBestRevenuePerInhabitant));
    salesIndexDf = salesIndexDf.withColumn(
        "income_of_best",
        lit(incomeOfBestRevenuePerInhabitant));

现在您已准备好创建这 3 个指数。

    salesIndexDf = salesIndexDf.withColumn(
        "idx_revenue",
        salesIndexDf.col("best_revenue_per_inh")
            .divide(salesIndexDf.col("revenue_by_inh")));
    salesIndexDf = salesIndexDf.withColumn(
        "idx_pop",
        salesIndexDf.col("pop").divide(salesIndexDf.col("pop_of_best")));
    salesIndexDf = salesIndexDf.withColumn(
        "idx_income",
        salesIndexDf.col("income_per_inh").divide(salesIndexDf.col("income_of_best")));

现在是时候为每个区域创建最后一个指数了,即每个指数的乘积。

    salesIndexDf = salesIndexDf.withColumn(
        "index",
        salesIndexDf.col("idx_revenue").multiply(salesIndexDf.col("idx_pop")
            .multiply(salesIndexDf.col("idx_income"))));

将该指数应用于现有的收入。

 salesIndexDf = salesIndexDf.withColumn(
        "potential_revenue",
        salesIndexDf.col("revenue").multiply(salesIndexDf.col("index")));

您可以丢弃一些列来增强输出。您可以丢弃更多列,但也需要让您的报告看起来足够科学。 最后,确保按潜在收入的降序进行了排序。

    salesIndexDf = salesIndexDf
        .drop("idx_income")
        .drop("idx_pop")
        .drop("idx_revenue")
        .drop("income_of_best")
        .drop("pop_of_best")
        .drop("best_revenue_per_inh")
        .orderBy(salesIndexDf.col("potential_revenue").desc());

您可以查看结果数据。

    salesIndexDf.show();
  }
+-------+-------+-----+------------------+------------------+------------------+
|zipcode|revenue|  pop|    income_per_inh|             index| potential_revenue|
+-------+-------+-----+------------------+------------------+------------------+
|  94025|  84.00|40526| 840368.1256477323|1610.5247342457083|135284.07767663948|
|  08540|1499.97|47115|469565.43117903004| 68.11481294046366|102170.17596630729|
|  94086|1200.00|45697| 244836.9227739239| 41.76194133319635| 50114.32959983562|
|  94062|1390.00|25876| 738260.2450146854| 34.85768977158333| 48452.18878250083|
|  80219| 232.00|61296|104358.72308144088| 165.6588621914614|38432.856028419046|
|  94022| 448.00|18500|1081220.6994594594| 80.96352858991159|36271.660808280394|
|  94040| 562.00|32996|257082.76791126197|  48.8167513518355| 27435.01425973155|
|  32256| 438.00|38483|142462.90881688017| 47.21447575076384|20679.940378834563|
|  85016| 584.00|33896| 94057.98914326174| 18.13799917867292|10592.591520344986|
|  94063|5592.00|30949| 59561.31635917154|               1.0|            5592.0|
|  94085| 450.00|21247| 95544.70842942533| 9.395047814603645|  4227.77151657164|
|  74006|1614.00|25750| 63162.90718446602|2.5434469314609442| 4105.123347377964|
|  08002| 654.00|22274|59319.770584538026| 4.410905752434176| 2884.732362091951|
|  60406| 824.00|25460| 36702.44422623723|2.8300506455361445| 2331.961731921783|
|  94026|1451.80| null|              null|              null|              null|
|  19898|1131.00| null|              null|              null|              null|
+-------+-------+-----+------------------+------------------+------------------+

结果和验证

您以邮政编码为 94063 的区域的销售额作为参照,您在这里的销售额为 5,592 美元。 此区域的指数为 1。

基于您构建的指数,可以看到最有潜力的是邮政编码为 94025 的区域,这是加利福尼亚州的门洛帕克(有趣的是,这是 Informix 的诞生地)。数据表明,这里每个家庭的收入都处于国内最高水平。因此,可以合理的认为邮政编码为 94025 的区域具有更高的销售潜力。

旧金山湾区的地图
旧金山湾区的地图

潜力最低的区域的邮政编码为 60406,也就是伊利诺斯州的蓝岛,这是芝加哥南部的一个人口稀少的地区。

伊利诺斯州南部的地图
伊利诺斯州南部的地图

您可以看到具有最高潜力的 5 个邮政区:加利福尼亚州的门洛帕克、新泽西州的普林斯顿、加利福尼亚州的森尼维耳、加利福尼亚州的雷德伍德城,以及科罗拉多州的丹佛。您的销售经理可能会得出这样的结论:可以合理地加强这些区域的销售工作。

学到的知识

在本教程系列的第 4 部分中,您学习了:

  • 如何执行高级分析。尽管您的企业内部可能没有所有数据,但您可以从行政部门或开放数据门户下载外部数据集。
  • Spark 在来自 RDBMS 或外部文件的数据上同样表现良好。
  • 借助所有 API 或语言,可以通过不同方式实现相同的目的。最好先达成一致并阐明概念,这将有助于代码维护。
  • 如何验证您的结果。您的发现与直觉是一致的:加利福尼亚州的人有更多钱投入到体育设备上。

继续探索


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Open source, Big data and analytics
ArticleID=1053809
ArticleTitle=将 Informix 数据引入到 Spark 中,第 4 部分: 利用其他数据源的数据
publish-date=11292017