内容


将 Informix 数据引入到 Spark 中,第 3 部分

复杂分析

结合其他数据源来利用数据

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分: 将 Informix 数据引入到 Spark 中,第 3 部分

敬请期待该系列的后续内容。

此内容是该系列的一部分:将 Informix 数据引入到 Spark 中,第 3 部分

敬请期待该系列的后续内容。

我们身在何处?我们将去往何处?

在本教程系列的第 1 部分中,您学习了如何将一个表从 Informix 移动到 Spark 中,如何将整个数据库放在 Spark 中,以及如何为 Informix 构建一种特定方言。第 2 部分重点介绍了基本分析以及如何从数据中获取洞察。

第 3 部分将更详细介绍连接和更复杂的查询。您将继续使用 Informix 样本数据库,但不需要任何 Informix 知识。

更多数据!

您将使用以下 4 个表:

  • Customer
  • Orders
  • Item(订单中的每一行)
  • Stock(对您销售的产品的描述)
已填充的表的操作流程。
已填充的表的操作流程。

在这个数据库的结构中,唯一要注意的地方是(Stock 表中的)产品由两个键唯一标识:stock_nummanu_codemanu_code 字段定义制造商,您不会使用它。尽管这个表称为 Stock,但它不会提供库存/仓库中的物品数。这有点难以理解,但在您继承一个旧系统时,其中的内容可能令您困惑,对吧?

您可能采用了不同的设计方式,但我想这是挑战(和乐趣)的一部分:对 80 年代末设计的数据库执行分析。为实现教程的目标设计一个自定义数据库太容易了!

未售商品

第二个应用程序深入分析数据,查找您已引用但从未售出的物品。您的销售人员可能想通过销售、返回给制造商和其他方式处理掉这些产品。但我们是 IT 人员,所以我们不要太多地干扰销售人员和购买者的创造力。

这个示例 UnsoldProductsApp.java(在 net.jgp.labs.informix2spark.l300 包 中)和上一个示例 TimeToShipApp.java(在 net.jgp.labs.informix2spark.l200 包中)有许多通用的代码。我希望提供完整的示例,但在本例中,因为我们的示例是基于上一个示例来构建的,所以我将重点关注发生改动的地方。

保留“数据湖”,它包含 4 个表:

List<String> tables = new ArrayList<>();
tables.add("customer");
tables.add("orders");
tables.add("items");
tables.add("stock");

请求 Spark 加载它们:

Map<String, Dataset<Row>> datalake = new HashMap<>();
    for (String table : tables) {
      System.out.print("Loading table [" + table + "] ...");
      Dataset<Row> df = spark.read()
          .format("jdbc")
          .option("url", config.getJdbcUrl())
          .option("dbtable", table)
          .option("user", config.getUser())
          .option("password", config.getPassword())
          .option("driver", config.getDriver())
          .load();
      datalake.put(table, df);
      System.out.println("done");
    }

    System.out.println("We have loaded " + datalake.size() + " table(s) in our data lake");

目前为止,执行此程序会得到以下结果:

Loading table [customer] ... done
Loading table [orders] ... done
Loading table [items] ... done
Loading table [stock] ... done
We have loaded 4 table(s) in our data lake

可以快速浏览一下数据:

for (String table : tables) {
      Dataset<Row> df = datalake.get(table);
      System.out.println("Number of rows in " + table + ": " + df.count());
      df.show(10);
      df.printSchema();
    }

您应会获得类似下面这样的结果(仅限于第 4 个数据集):

...
Number of rows in stock: 74
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|        1|      HRO|baseball gloves|    250.00|case|10 gloves/case |
|        1|      HSK|baseball gloves|    800.00|case|10 gloves/case |
|        1|      SMT|baseball gloves|    450.00|case|10 gloves/case |
|        2|      HRO|baseball       |    126.00|case|24/case        |
|        3|      HSK|baseball bat   |    240.00|case|12/case        |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|        4|      HSK|football       |    960.00|case|24/case        |
|        4|      HRO|football       |    480.00|case|24/case        |
|        5|      NRG|tennis racquet |     28.00|each|each           |
|        5|      SMT|tennis racquet |     25.00|each|each           |
+---------+---------+---------------+----------+----+---------------+
only showing top 10 rows

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

可以将每个数据集分配给一个数据帧 (Dataset<Row>)。 在以后可以更轻松地重用它们。

Dataset<Row> ordersDf = datalake.get("orders");
    Dataset<Row> customerDf = datalake.get("customer");
    Dataset<Row> itemsDf = datalake.get("items");
    Dataset<Row> stockDf = datalake.get("stock");

通过连接来查看更多数据

众所周知,要获得更多洞察,就需要连接这些表:

Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();

    Dataset<Row> allDf = customerDf
        .join(
            ordersDf, 
            customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
            "full_outer")
        .join(
            itemsDf, 
            ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
            "full_outer")
        .join(stockDf, stockColumns, "full_outer")
        .drop(ordersDf.col("customer_num"))
        .drop(itemsDf.col("order_num"))
        .drop(stockDf.col("stock_num"))
        .drop(stockDf.col("manu_code"));

因为我们使用了方法链接,所以这种转换看起来非常复杂。方法链接正变得越来越流行。在这里,使用 4 列来连接 4 个表,并丢弃重复的列。

拆分连接流程

为了更好地理解连接流程,可以将转换操作拆分为更加原子化的操作,比如:

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();
    allDf = allDf.join(stockDf, stockColumns, "full_outer");
...

可以分步执行这些操作。分步执行代码也包含在 GitHub 存储库中;在 net.jgp.labs.informix2spark.l301 包中查找 UnsoldProductsSplitJoinApp 类。

现在可以将 orders 表与 customer_num 列上的 customer 表连接,执行一次外部连接。

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf.printSchema();
    allDf.show();

您的数据帧的模式类似于:

root
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)

请注意,您有两个名为 customer_num 的列。一列来自左侧的 customer 表(更准确地讲是 customerDf 数据帧),另一列来自右侧的 orders 表(或 ordersDf 数据帧)。

连接数据帧类似于使用 Informix 和其他 RDBMS 来连接表:从左往右连接,指定一个条件(在这里我们使用了等值)和一种连接方式(full_outer、left 等)。

我通过删除额外的列来简化数据输出。查看该数据,可以注意到有两个 customer_num 列,但它们的值不同。为什么?

+------------+--------------------+---------+----------+------------+----------+----------+
|customer_num|             company|order_num|order_date|customer_num|    po_num| paid_date|
+------------+--------------------+---------+----------+------------+----------+----------+
|         108|Quinn's Sports      |     null|      null|        null|      null|      null|
|         101|All Sports Supplies |     1002|2008-05-21|         101|9270      |2008-06-03|
|         115|Gold Medal Sports   |     1010|2008-06-17|         115|429Q      |2008-08-22|
|         126|Neelie's Discount Sp|     1022|2008-07-24|         126|W9925     |2008-09-02|
|         103|Phil's Sports       |     null|      null|        null|      null|      null|
|         128|Phoenix University  |     null|      null|        null|      null|      null|
|         122|The Sporting Life   |     1019|2008-07-11|         122|Z55709    |2008-08-06|
|         111|Sports Center       |     1009|2008-06-14|         111|4745      |2008-08-21|
|         120|Century Pro Shop    |     1017|2008-07-09|         120|DM354331  |      null|
|         117|Kids Korner         |     1007|2008-05-31|         117|278693    |      null|
|         117|Kids Korner         |     1012|2008-06-18|         117|278701    |      null|
|         112|Runners & Others    |     1006|2008-05-30|         112|Q13557    |      null|
|         127|Big Blue Bike Shop  |     1023|2008-07-24|         127|KF2961    |2008-08-22|
|         107|Athletic Supplies   |     null|      null|        null|      null|      null|
|         114|Sporting Place      |     null|      null|        null|      null|      null|
|         102|Sports Spot         |     null|      null|        null|      null|      null|
|         113|Sportstown          |     null|      null|        null|      null|      null|
|         121|City Sports         |     1018|2008-07-10|         121|S22942    |2008-08-06|
|         125|Total Fitness Sports|     null|      null|        null|      null|      null|
|         109|Sport Stuff         |     null|      null|        null|      null|      null|
|         105|Los Altos Sports    |     null|      null|        null|      null|      null|
|         110|AA Athletics        |     1008|2008-06-07|         110|LZ230     |2008-07-21|
|         110|AA Athletics        |     1015|2008-06-27|         110|MA003     |2008-08-31|
|         106|Watson & Son        |     1004|2008-05-22|         106|8006      |      null|
|         106|Watson & Son        |     1014|2008-06-25|         106|8052      |2008-07-10|
|         116|Olympic City        |     1005|2008-05-24|         116|2865      |2008-06-21|
|         123|Bay Sports          |     1020|2008-07-11|         123|W2286     |2008-09-20|
|         119|The Triathletes Club|     1016|2008-06-29|         119|PC6782    |      null|
|         131|JGP.net             |     null|      null|        null|      null|      null|
|         118|Blue Ribbon Sports  |     null|      null|        null|      null|      null|
|         124|Putnum's Putters    |     1021|2008-07-23|         124|C3288     |2008-08-22|
|         104|Play Ball!          |     1001|2008-05-20|         104|B77836    |2008-07-22|
|         104|Play Ball!          |     1003|2008-05-22|         104|B77890    |2008-06-14|
|         104|Play Ball!          |     1011|2008-06-18|         104|B77897    |2008-08-29|
|         104|Play Ball!          |     1013|2008-06-22|         104|B77930    |2008-07-31|
+------------+--------------------+---------+----------+------------+----------+----------+

第一个 customer_num 列属于左侧的数据帧。 因为您执行了一次完全外部连接,所以您获得了所有数据,甚至包括来自未下单的客户的数据。正因如此,第二个 customer_num 列中的一些条目包含 null 值,比如第 4 或第 8 行上。

如果您以后想引用此列,Spark 可能无法理解您调用的是哪个 customer_num 列。因此,删除不想要的列(比如第 2 列)是合乎情理的。

要删除不想要的列,请在数据帧上调用 drop() 方法。请注意参数:通过使用数据帧的 col() 方法来指定想要丢弃哪一列。

allDf = allDf.drop(ordersDf.col("customer_num"));

然后,可以连接另外两个表。

下面的连接仅将数据帧 (allDf) 和 order_num 列上的项连接,然后丢弃额外的列:

allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    allDf = allDf.drop(itemsDf.col("order_num"));

有两个列链接到最后一个表:stock_nummanu_code。您需要将它们组装到一个序列 (Seq) 中。这里使用的序列和集合是 Scala 对象,但您在 Java 代码中也可以轻松地使用它们。

Seq<String> stockColumns = new Set2<>("stock_num",
                "manu_code").toSeq();

现在,连接语法更加简单:

allDf = allDf.join(stockDf, stockColumns, "full_outer");

你可以删除最后两列:

allDf = allDf.drop(stockDf.col("stock_num"));
allDf = allDf.drop(stockDf.col("manu_code"));

到此,我们已经详细演练了原子连接操作与方法链接。如有任何疑问,请在本教程底部自由发表评论。

利用整个数据帧

现在您已将所有数据放在一个数据帧中!可以开始深入分析它来获取价值了。

缓存您的数据是一个良好的习惯,这样,您在每次需要它时就不需要再重新计算。

allDf.cache();

也可以输出模式或查看一些数据行。

allDf.printSchema();
allDf.show(5);

在该模式中,可以看到没有任何重复行。

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)
 |-- item_num: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: decimal(8,2) (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

And the data is indeed wide as there are now all those columns resulting from the merge.
(line numbers useful here)
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|stock_num|manu_code|customer_num|          fname|          lname|             company|            address1|address2|           city|state|zipcode|             phone|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|item_num|quantity|total_price|    description|unit_price|unit|     unit_descr|
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|      101|      SHM|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       1|       2|     136.00|bicycle tires  |     68.00|box |4/box          |
|      309|      SHM|        null|           null|           null|                null|                null|    null|           null| null|   null|              null|     null|      null|        null|                null|   null|      null|      null|       null|       null|      null|    null|    null|       null|ear drops      |     40.00|case|20/case        |
|      109|      PRC|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       2|       3|      90.00|pedal binding  |     30.00|case|6 pairs/case   |
|        6|      ANZ|         116|Jean           |Parmelee       |Olympic City        |1104 Spinosa Drive  |    null|Mountain View  |   CA|  94040|415-534-8822      |     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|       4|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
|        6|      ANZ|         115|Alfred         |Grant          |Gold Medal Sports   |776 Gary Avenue     |    null|Menlo Park     |   CA|  94025|415-356-1123      |     1010|2008-06-17|         115|deliver 776 King ...|      n|429Q      |2008-06-29|      40.60|      12.30|2008-08-22|       2|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
only showing top 5 rows

清空您的仓库

留给您的练习是找到从未售出的产品。

先删除您不需要的列。在上一个示例中,您一次删除了一列。既然您是一位 Spark 专家,您可以稍微高效一点,对吧?按照同样的思路,在掌握方法链接后,现在可以在一次调用中执行此操作,对吧?

首先,创建一些列。

List<String> columnsToDrop = new ArrayList<>();
    columnsToDrop.add("zipcode");
    columnsToDrop.add("phone");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("fname");
    columnsToDrop.add("lname");
    columnsToDrop.add("company");
    columnsToDrop.add("address1");
    columnsToDrop.add("address2");
    columnsToDrop.add("city");
    columnsToDrop.add("state");
    columnsToDrop.add("order_num");
    columnsToDrop.add("order_date");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("ship_instruct");
    columnsToDrop.add("backlog");
    columnsToDrop.add("po_num");
    columnsToDrop.add("ship_date");
    columnsToDrop.add("ship_weight");
    columnsToDrop.add("ship_charge");
    columnsToDrop.add("paid_date");
    columnsToDrop.add("time_to_ship");
    columnsToDrop.add("item_num");
    columnsToDrop.add("quantity");
    columnsToDrop.add("total_price");

然后在一个操作中执行以下操作:

  1. 丢弃所有列。注意如何将一个 Java 列表转换为用于 drop() 的 Scala 序列
  2. 使用 Spark SQL 语法进行过滤。
Dataset<Row> unsoldProductsDf = allDf
        .drop(JavaConversions.asScalaBuffer(columnsToDrop))
        .filter("order_num IS NULL")
        .filter("description IS NOT NULL");
    unsoldProductsDf.cache();
    System.out.println("We have " + unsoldProductsDf.count()
        + " unsold references in our warehouse, time to do something!");
    unsoldProductsDf.show();

您现在已经知道您的仓库中有多少产品引用是 never sold:

There are 35 unsold references in this warehouse, it's time to do something! 
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|      309|      SHM|ear drops      |     40.00|case|20/case        |
|      312|      SHM|racer goggles  |     96.00|box |12/box         |
|      203|      NKL|irons/wedge    |    670.00|case|2 sets/case    |
|      102|      PRC|bicycle brakes |    480.00|case|4 sets/case    |
|      302|      HRO|ice pack       |      4.50|each|each           |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|      205|      HRO|3 golf balls   |    312.00|case|24/case        |
|      313|      ANZ|swim cap       |     60.00|box |12/box         |
|      110|      ANZ|helmet         |    244.00|case|4/case         |
|      301|      HRO|running shoes  |     42.50|each|each           |
|      108|      SHM|crankset       |     45.00|each|each           |
|      110|      HRO|helmet         |    260.00|case|4/case         |
|      205|      NKL|3 golf balls   |    312.00|case|24/case        |
|      311|      SHM|water gloves   |     48.00|box |4 pairs/box    |
|      310|      SHM|kick board     |     80.00|case|10/case        |
|      303|      KAR|socks          |     36.00|box |24 pairs/box   |
|      310|      ANZ|kick board     |     84.00|case|12/case        |
|      101|      PRC|bicycle tires  |     88.00|box |4/box          |
|      109|      SHM|pedal binding  |    200.00|case|4 pairs/case   |
|      312|      HRO|racer goggles  |     72.00|box |12/box         |
+---------+---------+---------------+----------+----+---------------+
only showing top 20 rows

此仓库中有 35 个 unsold 引用,所以是时候做点什么了!

现在可以将此信息告知销售和购买部门。

学到的知识

本系列中的这篇文章稍微有点长,学习了如何在 Spark 中连接数据,丢弃一些列,缓存一个数据帧,并使用 SQL 在结果数据帧上执行分析。有了此信息,就可以将有关从未售出产品的洞察提供给购买部门,将有关要摆脱哪些产品的洞察提供给销售人员!

继续探索


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Open source, Big data and analytics
ArticleID=1051667
ArticleTitle=将 Informix 数据引入到 Spark 中,第 3 部分: 复杂分析
publish-date=11012017