目次


InformixのデータをSparkにオフロードする, 第 3 回

複合分析

データを別のデータ・ソースに対して利用する

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: InformixのデータをSparkにオフロードする, 第 3 回

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:InformixのデータをSparkにオフロードする, 第 3 回

このシリーズの続きに乞うご期待。

これまでの復習と、これから取り組む作業

このチュートリアル・シリーズの第 1 回では、Informix から Spark に個々のテーブルを移動する方法、データベース全体を Spark 内に取り込む方法、Informix 固有のダイアレクトを作成する方法を説明しました。第 2 回では、基本的なアナリティスクによってデータから洞察を引き出す方法にフォーカスしました。

この第 3 回では、結合と、より複雑なクエリーについて詳しく探ります。今回も引き続き Informix のサンプル・データベースを使用しますが、Informix の知識は必要ありません。

さらに多くのデータを扱います!

今回は、以下の 4 つのテーブルを使用します。

  • 顧客
  • 注文
  • 品目 (注文の各明細)
  • 在庫 (販売する商品の説明)
データが取り込まれたテーブルのフロー
データが取り込まれたテーブルのフロー

このデータベースの構造について、唯一注意しなければならない点は、(在庫テーブル内の) 商品は stock_nummanu_code という 2 つのキーで一意に識別されることです。manu_code は製造元を定義するフィールドなので、ここでは使用しません。このテーブルには在庫 (stock) という名前が付けられているものの、在庫/倉庫にある商品の数を示すわけではありません。少々紛らわしいかもしれませんが、レガシー・システムを継承すると、物事が混乱することはよくありますよね?

このテーブルは別の設計によっても実装可能ですが、私は 80 年代後半に設計されたこのデータベースを使用して分析を行うのも挑戦 (面白さ) の一環であると捉えています。チュートリアルを目的にカスタム・データベースを設計するのでは、あまりにも簡単すぎます!

売れ残り

これから作成する 2 番目のアプリケーションでは、データを掘り下げて、参照はされているものの、一度も売られたことのない商品を探します。販売担当者はこれらの商品を売り払うか、製造元に返品するかなどして一掃する必要があります。けれども私たちは IT 分野の人間です。販売担当者や購買部門の独創性には干渉しないことにしましょう。

この UnsoldProductsApp.java というサンプル・コード (net.jgp.labs.informix2spark.l300 パッケージの一部) と前回の TimeToShipApp.java というサンプル・コード (net.jgp.labs.informix2spark.l200 パッケージの一部) には、共通するコードがたくさんあります。完全なサンプル・コードを記載したいところですが、ここでは前回のサンプル・コードを拡張するという設定なので、変更する部分に重点を置きます。

4 つのテーブルが含まれる「データ・レイク」を保持します。

List<String> tables = new ArrayList<>();
tables.add("customer");
tables.add("orders");
tables.add("items");
tables.add("stock");

Spark に、テーブルをロードするように指示します。

Map<String, Dataset<Row>> datalake = new HashMap<>();
    for (String table : tables) {
      System.out.print("Loading table [" + table + "] ... ");
      Dataset<Row> df = spark.read()
          .format("jdbc")
          .option("url", config.getJdbcUrl())
          .option("dbtable", table)
          .option("user", config.getUser())
          .option("password", config.getPassword())
          .option("driver", config.getDriver())
          .load();
      datalake.put(table, df);
      System.out.println("done");
    }

    System.out.println("We have loaded " + datalake.size() + " table(s) in our data lake");

現時点でこのプログラムを実行すると、以下の出力が表示されます。

Loading table [customer] ... done
Loading table [orders] ... done
Loading table [items] ... done
Loading table [stock] ... done
We have loaded 4 table(s) in our data lake

以下のコードを使って、少量のデータを参照することができます。

for (String table : tables) {
      Dataset<Row> df = datalake.get(table);
      System.out.println("Number of rows in " + table + ": " + df.count());
      df.show(10);
      df.printSchema();
    }

上記のコードを実行すると、以下のような出力が表示されるはずです (ここには、4 番目のデータ・セットに関する出力を抜粋しています)。

...
Number of rows in stock: 74
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|        1|      HRO|baseball gloves|    250.00|case|10 gloves/case |
|        1|      HSK|baseball gloves|    800.00|case|10 gloves/case |
|        1|      SMT|baseball gloves|    450.00|case|10 gloves/case |
|        2|      HRO|baseball       |    126.00|case|24/case        |
|        3|      HSK|baseball bat   |    240.00|case|12/case        |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|        4|      HSK|football       |    960.00|case|24/case        |
|        4|      HRO|football       |    480.00|case|24/case        |
|        5|      NRG|tennis racquet |     28.00|each|each           |
|        5|      SMT|tennis racquet |     25.00|each|each           |
+---------+---------+---------------+----------+----+---------------+
only showing top 10 rows

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

各セットをデータフレーム(Dataset<Row>) に割り当てることができます。こうすると、後で再利用しやすくなります。

Dataset<Row> ordersDf = datalake.get("orders");
    Dataset<Row> customerDf = datalake.get("customer");
    Dataset<Row> itemsDf = datalake.get("items");
    Dataset<Row> stockDf = datalake.get("stock");

テーブルを結合して、さらにデータを調査する

ご存知の通り、さらに洞察を引き出すにはテーブルを結合する必要があります。

Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();

    Dataset<Row> allDf = customerDf
        .join(
            ordersDf, 
            customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
            "full_outer")
        .join(
            itemsDf, 
            ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
            "full_outer")
        .join(stockDf, stockColumns, "full_outer")
        .drop(ordersDf.col("customer_num"))
        .drop(itemsDf.col("order_num"))
        .drop(stockDf.col("stock_num"))
        .drop(stockDf.col("manu_code"));

上記のコードではメソッド・チェーンを使っているため、この変換はかなり複雑に見えますが、メソッド・チェーンは次第によく使われるようになってきています。ここでは 4 つの列を使用した 4 つのテーブルの結合を行って、重複する列をドロップしています。

結合を分割する

結合プロセスの理解を深めるには、以下のように、変換をよりアトミックな処理に分割することができます。

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();
    allDf = allDf.join(stockDf, stockColumns, "full_outer");
...

分割された処理は、ステップ・バイ・ステップで実行できます。ステップ・バイ・ステップ・バージョンのコードも GitHub リポジトリー内にあります。net.jgp.labs.informix2spark.l301 パッケージに含まれる UnsoldProductsSplitJoinApp クラスを探してください。

これで、customer_num 列を基準に orders テーブルを customer テーブルと結合することで、外部結合を実行できます。

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf.printSchema();
    allDf.show();

データフレームのスキーマは以下のようになります。

root
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)

customer_num 列が 2 つあることに注目してください。一方は、左側の customer テーブル (より正確に言うと、customerDf データフレーム) に属する列で、もう一方は右側の orders テーブル (ordersDf データフレーム) に属する列です。

データフレームを結合するには、Informix や他の RDBMS を使用してテーブルを結合する場合と同じように、条件 (ここでは等価結合を使用) と結合方法 (full_outer、left など) を指定して、左から右への方向で結合します。

以下に記載するデータの出力は、余分な列を削除して単純にしています。データを調べると、customer_num 列は 2 つあるものの、列の値はまったく同じではないことに気付くはずです。なぜでしょうか?

+------------+--------------------+---------+----------+------------+----------+----------+
|customer_num|             company|order_num|order_date|customer_num|    po_num| paid_date|
+------------+--------------------+---------+----------+------------+----------+----------+
|         108|Quinn's Sports      |     null|      null|        null|      null|      null|
|         101|All Sports Supplies |     1002|2008-05-21|         101|9270      |2008-06-03|
|         115|Gold Medal Sports   |     1010|2008-06-17|         115|429Q      |2008-08-22|
|         126|Neelie's Discount Sp|     1022|2008-07-24|         126|W9925     |2008-09-02|
|         103|Phil's Sports       |     null|      null|        null|      null|      null|
|         128|Phoenix University  |     null|      null|        null|      null|      null|
|         122|The Sporting Life   |     1019|2008-07-11|         122|Z55709    |2008-08-06|
|         111|Sports Center       |     1009|2008-06-14|         111|4745      |2008-08-21|
|         120|Century Pro Shop    |     1017|2008-07-09|         120|DM354331  |      null|
|         117|Kids Korner         |     1007|2008-05-31|         117|278693    |      null|
|         117|Kids Korner         |     1012|2008-06-18|         117|278701    |      null|
|         112|Runners & Others    |     1006|2008-05-30|         112|Q13557    |      null|
|         127|Big Blue Bike Shop  |     1023|2008-07-24|         127|KF2961    |2008-08-22|
|         107|Athletic Supplies   |     null|      null|        null|      null|      null|
|         114|Sporting Place      |     null|      null|        null|      null|      null|
|         102|Sports Spot         |     null|      null|        null|      null|      null|
|         113|Sportstown          |     null|      null|        null|      null|      null|
|         121|City Sports         |     1018|2008-07-10|         121|S22942    |2008-08-06|
|         125|Total Fitness Sports|     null|      null|        null|      null|      null|
|         109|Sport Stuff         |     null|      null|        null|      null|      null|
|         105|Los Altos Sports    |     null|      null|        null|      null|      null|
|         110|AA Athletics        |     1008|2008-06-07|         110|LZ230     |2008-07-21|
|         110|AA Athletics        |     1015|2008-06-27|         110|MA003     |2008-08-31|
|         106|Watson & Son        |     1004|2008-05-22|         106|8006      |      null|
|         106|Watson & Son        |     1014|2008-06-25|         106|8052      |2008-07-10|
|         116|Olympic City        |     1005|2008-05-24|         116|2865      |2008-06-21|
|         123|Bay Sports          |     1020|2008-07-11|         123|W2286     |2008-09-20|
|         119|The Triathletes Club|     1016|2008-06-29|         119|PC6782    |      null|
|         131|JGP.net             |     null|      null|        null|      null|      null|
|         118|Blue Ribbon Sports  |     null|      null|        null|      null|      null|
|         124|Putnum's Putters    |     1021|2008-07-23|         124|C3288     |2008-08-22|
|         104|Play Ball!          |     1001|2008-05-20|         104|B77836    |2008-07-22|
|         104|Play Ball!          |     1003|2008-05-22|         104|B77890    |2008-06-14|
|         104|Play Ball!          |     1011|2008-06-18|         104|B77897    |2008-08-29|
|         104|Play Ball!          |     1013|2008-06-22|         104|B77930    |2008-07-31|
+------------+--------------------+---------+----------+------------+----------+----------+

最初の customer_num 列は、左側のデータフレームに属しています。完全な外部結合を行ったため、すべてのデータが取り込まれています。その中には、注文を行っていない顧客のデータまで含まれています。そのため、2 番目の customer_num 列には、行 4 や行 8 のように値が null となっているエントリーがあるというわけです。

後でこの列を参照するとしたら、Spark はどちらの customer_num 列が呼び出されているのか混乱するでしょう。したがって、不要な列 (例えば、2 番目の列) を削除することが理にかなっています.

不要な列を削除するには、データフレームに対して drop() メソッドを呼び出します。引数に注意してください。データフレームの col() メソッドを使用して、ドロップする列を指定します。

allDf = allDf.drop(ordersDf.col("customer_num"));

次は、他の 2 つのテーブルを結合します。

以下の結合処理では、データフレーム (allDf) をそのまま order_num 列の品目と結合してから、余分な列をドロップしています。

allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    allDf = allDf.drop(itemsDf.col("order_num"));

Two columns link the last table: stock_num and manu_code. You need to assemble them in a sequence (Seq). The sequence and set used here are Scala objects, but you can use them easily in your Java code.

Seq<String> stockColumns = new Set2<>("stock_num",
                "manu_code").toSeq();

これで、結合の構文がさらに単純になります。

allDf = allDf.join(stockDf, stockColumns, "full_outer");

最後の 2 つの列は削除できます。

allDf = allDf.drop(stockDf.col("stock_num"));
allDf = allDf.drop(stockDf.col("manu_code"));

これで、アトミックな結合処理とメソッド・チェーンの詳しい段階的説明は終わりです。不明な点がある場合は、このチュートリアルの最後にあるコメント・セクションを使って遠慮なく質問してください。

データフレームを完全に活用する

すべてのデータが 1 つのデータフレーム内に収まりました。ここからは、データフレームをドリルスルーして価値を引き出すことができます。

賢明な考え方としては、データが必要になるたびに再度計算しなくても済むように、データをキャッシュします。

allDf.cache();

スキーマを出力したり、数行のデータを表示したりすることもできます。

allDf.printSchema();
allDf.show(5);

スキーマを見ると、重複する行は含まれていないことがわかります。

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)
 |-- item_num: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: decimal(8,2) (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

And the data is indeed wide as there are now all those columns resulting from the merge.
(line numbers useful here)
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|stock_num|manu_code|customer_num|          fname|          lname|             company|            address1|address2|           city|state|zipcode|             phone|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|item_num|quantity|total_price|    description|unit_price|unit|     unit_descr|
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|      101|      SHM|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       1|       2|     136.00|bicycle tires  |     68.00|box |4/box          |
|      309|      SHM|        null|           null|           null|                null|                null|    null|           null| null|   null|              null|     null|      null|        null|                null|   null|      null|      null|       null|       null|      null|    null|    null|       null|ear drops      |     40.00|case|20/case        |
|      109|      PRC|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       2|       3|      90.00|pedal binding  |     30.00|case|6 pairs/case   |
|        6|      ANZ|         116|Jean           |Parmelee       |Olympic City        |1104 Spinosa Drive  |    null|Mountain View  |   CA|  94040|415-534-8822      |     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|       4|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
|        6|      ANZ|         115|Alfred         |Grant          |Gold Medal Sports   |776 Gary Avenue     |    null|Menlo Park     |   CA|  94025|415-356-1123      |     1010|2008-06-17|         115|deliver 776 King ...|      n|429Q      |2008-06-29|      40.60|      12.30|2008-08-22|       2|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
only showing top 5 rows

倉庫を空にする

ここでの演習は、一度も売れたことのない商品を見つけることです。

まずは、不要な列を削除するところから始めます。前の例では、列を一度に 1 つずつ削除しましたが、今回は、Spark のエキスパートとして、もう少し効率的な方法をとれるはずです。同じように、皆さんはメソッド・チェーンをマスターしているので、この処理についても 1 回の呼び出しで完了できるはずです。

最初に、列のリストを作成します。

List<String> columnsToDrop = new ArrayList<>();
    columnsToDrop.add("zipcode");
    columnsToDrop.add("phone");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("fname");
    columnsToDrop.add("lname");
    columnsToDrop.add("company");
    columnsToDrop.add("address1");
    columnsToDrop.add("address2");
    columnsToDrop.add("city");
    columnsToDrop.add("state");
    columnsToDrop.add("order_num");
    columnsToDrop.add("order_date");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("ship_instruct");
    columnsToDrop.add("backlog");
    columnsToDrop.add("po_num");
    columnsToDrop.add("ship_date");
    columnsToDrop.add("ship_weight");
    columnsToDrop.add("ship_charge");
    columnsToDrop.add("paid_date");
    columnsToDrop.add("time_to_ship");
    columnsToDrop.add("item_num");
    columnsToDrop.add("quantity");
    columnsToDrop.add("total_price");

1 回の処理で、以下のことを行います。

  1. すべての列をドロップします。Java のリストを、drop() と組み合わせて使用する Scala シーケンスに変換する方法に注目してください。
  2. Spark SQL 構文を使用してフィルタリングします。
Dataset<Row> unsoldProductsDf = allDf
        .drop(JavaConversions.asScalaBuffer(columnsToDrop))
        .filter("order_num IS NULL")
        .filter("description IS NOT NULL");
    unsoldProductsDf.cache();
    System.out.println("We have " + unsoldProductsDf.count()
        + " unsold references in our warehouse, time to do something!");
    unsoldProductsDf.show();

これで、倉庫内にある、一度も売れたことのない商品の参照がいくつあるかがわかります。

There are 35 unsold references in this warehouse, it's time to do something!
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|      309|      SHM|ear drops      |     40.00|case|20/case        |
|      312|      SHM|racer goggles  |     96.00|box |12/box         |
|      203|      NKL|irons/wedge    |    670.00|case|2 sets/case    |
|      102|      PRC|bicycle brakes |    480.00|case|4 sets/case    |
|      302|      HRO|ice pack       |      4.50|each|each           |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|      205|      HRO|3 golf balls   |    312.00|case|24/case        |
|      313|      ANZ|swim cap       |     60.00|box |12/box         |
|      110|      ANZ|helmet         |    244.00|case|4/case         |
|      301|      HRO|running shoes  |     42.50|each|each           |
|      108|      SHM|crankset       |     45.00|each|each           |
|      110|      HRO|helmet         |    260.00|case|4/case         |
|      205|      NKL|3 golf balls   |    312.00|case|24/case        |
|      311|      SHM|water gloves   |     48.00|box |4 pairs/box    |
|      310|      SHM|kick board     |     80.00|case|10/case        |
|      303|      KAR|socks          |     36.00|box |24 pairs/box   |
|      310|      ANZ|kick board     |     84.00|case|12/case        |
|      101|      PRC|bicycle tires  |     88.00|box |4/box          |
|      109|      SHM|pedal binding  |    200.00|case|4 pairs/case   |
|      312|      HRO|racer goggles  |     72.00|box |12/box         |
+---------+---------+---------------+----------+----+---------------+
only showing top 20 rows

この倉庫には、unsold の参照は 35 個あるので、そろそろ対処しなければなりません!

今すぐ販売部門と購買部門まで行って、この情報を伝えてください。

今回学んだ内容

このシリーズではやや長めのこの記事では、Spark でデータを結合する方法、列をドロップする方法、データフレームをキャッシュする方法、生成されたデータフレームに上で SQL を使用してアナリティクスを実行する方法を学びました。これだけの知識があれば、購買部門には売れたことのない商品に関する洞察を提供し、販売担当者には処分する商品に関する洞察を提供できるはずです。

詳細を調べるには


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Information Management, Open source
ArticleID=1058175
ArticleTitle=InformixのデータをSparkにオフロードする, 第 3 回: 複合分析
publish-date=02222018