目次


InformixのデータをSparkにオフロードする, 第 2 回

基本的なデータ分析

データを別のデータ・ソースで利用する

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: InformixのデータをSparkにオフロードする, 第 2 回

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:InformixのデータをSparkにオフロードする, 第 2 回

このシリーズの続きに乞うご期待。

このチュートリアル・シリーズの第 1 回では、Informix から Spark に個々のテーブルを移動する方法、データベース全体を Spark 内に取り込む方法、Informix 固有のダイアレクトを作成する方法を説明しました。

この第 2 回で焦点とするのは、基本的なデータ分析によって洞察を得る方法です。Informix の知識はまったく必要ありません。

エンタープライズ対応

前回に続き、Informix の stores_demo データベースを使用します。このデータベースは、エンタープライズ・データベースを模倣したもので、この会社はスポーツ用品の卸売業者として、家族経営のスポーツ店を対象にスポーツ用品を販売しています。このチュートリアルで使用するのは、orders テーブルだけです。以下に、Apache Spark 内にインポートした後の orders テーブルのスキーマを示します。

orders テーブルのスキーマ

上記のデータベース図に関する注: 元のデータベース内には、主キー、外部キー、制約、ネイティブ・データ型 (SERIAL など) があります。このすべてが Spark のデータ型および単純化された構造に変換されるため、インデックス、制約、キーは不要になります。

出荷までの所要時間

最初の演習では、顧客の注文を受けてから、その注文が出荷されるまでにウェアハウスでどれだけの時間がかかっているのかを分析します。

このような分析処理に Spark を使うのは携帯電話を充電するのに原子力発電所を使うようなものですが、このプロセスによって、漸進的に Spark アナリティクスの世界を発見していきます。

コードは GitHub からダウンロードできます。

コードを抽象化して実地検証する

最初のインポートは少々普通ではないと見えるかもしれないことに留意しておいてください。そうなるのは、Spark には多数の定義済み関数があるためです。必要に応じて、org.apache.spark.sql.functions パッケージに含まれる関数リストを参照すると役立ちます。

以下に示すコード内で、datediff* で置き換えると、すべての関数がインポートされますが、このチュートリアルで必要なのは datediff だけです。このコードの処理内容については、すでに察しがついていることでしょう。

import static org.apache.spark.sql.functions.datediff;

インポートされる残りのクラスは、第 1 回で使用したクラスと同様です。コードを実地検証する間、これらのインポートを共有したいと思います。そうでないと、同音異義語の名前を持つクラスを参照することになるので、かなりややこしくなります。

import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;

アプリケーションを開始します。

public class TimeToShipApp {

  public static void main(String[] args) {
    TimeToShipApp app = new TimeToShipApp();
    app.start();
  }

  private void start() {

Spark のローカル・モデルを使用してセッションを開始します。

SparkSession spark = SparkSession
        .builder()
        .appName("Time to Ship")
        .master("local")
        .getOrCreate();

この例には以下のコードは必要ありませんが、Informix ダイアレクトを登録することがベスト・プラクティスです。

JdbcDialect dialect = new InformixJdbcDialect();
    JdbcDialects.registerDialect(dialect);

Informix パラメーターを取得します。

 Config config = ConfigManager.getConfig(K.INFORMIX);

処理対象とするすべてのテーブルのリストを作成します (そのリストをデータ・レイクに追加します)。この例では 1 つのテーブルしか使用しないので、テーブルのリストを作成するまでのことはないと思えるかもしれませんが、この考えは、他の例にも適用できます (また、繰り返すことが学習に役立ちます)。

List<String> tables = new ArrayList<>();
    tables.add("orders");

ご存知のとおり、マップにはキーによるインデックスを付けた値が含まれています。Spark から提供される各データフレームがマップに保管されます、マップ内ではテーブルの名前がキーとなり、データが含まれるデータフレームが値となります。

Map<String, Dataset<Row>> datalake = new HashMap<>();
    for (String table : tables) {
      System.out.print("Loading table [" + table
          + "] ... ");
      Dataset<Row> df = spark.read()
          .format("jdbc")
          .option("url", config.getJdbcUrl())
          .option("dbtable", table)
          .option("user", config.getUser())
          .option("password", config.getPassword())
          .option("driver", config.getDriver())
          .load();

      datalake.put(table, df);
      System.out.println("done.");
    }

ここまでの時点でコードを実行すると、以下の結果になります。

Loading table [orders] ... done.
We have loaded 1 table(s) in our data lake.

これで、データを調査する準備ができました。データの調査には、ordersDF データフレームを使用します。

 Dataset<Row> ordersDf = datalake.get("orders");

withColumn() メソッドを使用して新しい列を作成します。この新しい列は、datediff() という Spark SQL 関数の結果です。この関数はパラメーターとして、終了日と開始日の 2 つを取ります。

ordersDf = ordersDf.withColumn(
        "time_to_ship", 
        datediff(ordersDf.col("ship_date"), ordersDf.col("order_date")));

以下の点に注意してください。

datediff(ordersDf.col("ship_date"), ordersDf.col("order_date"))

上記のコードは、以下のコードと同じではありません。

datediff(ordersDf.col("order_date"), ordersDf.col("ship_date"))

ここで何をしようとしているのかというと、減算です。したがって大きいほうの数値 (つまり新しいほうの日付) を最初に置かなければ、結果が負の値になってしまいます。

現時点でデータフレームのスキーマを出力すると、以下のような内容になっているはずです。

 ordersDf.printSchema();
root
 |-- order_num: integer (nullable = false)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = false)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)
 |-- time_to_ship: integer (nullable = true)

先ほど追加した列を確認できますか?追加した列の名前は time_to_ship で、その型は整数です (null になる場合もあります)。

データを見てください。

ordersDf.show(10);
    System.out.println("We have " + ordersDf.count() + " orders");
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|     1001|2008-05-20|         104|express          ...|      n|B77836    |2008-06-01|      20.40|      10.00|2008-07-22|          12|
|     1002|2008-05-21|         101|PO on box; delive...|      n|9270      |2008-05-26|      50.60|      15.30|2008-06-03|           5|
|     1003|2008-05-22|         104|express          ...|      n|B77890    |2008-05-23|      35.60|      10.80|2008-06-14|           1|
|     1004|2008-05-22|         106|ring bell twice  ...|      y|8006      |2008-05-30|      95.80|      19.20|      null|           8|
|     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|          16|
|     1006|2008-05-30|         112|after 10 am      ...|      y|Q13557    |      null|      70.80|      14.20|      null|        null|
|     1007|2008-05-31|         117|                null|      n|278693    |2008-06-05|     125.90|      25.20|      null|           5|
|     1008|2008-06-07|         110|closed Monday    ...|      y|LZ230     |2008-07-06|      45.60|      13.80|2008-07-21|          29|
|     1009|2008-06-14|         111|next door to groc...|      n|4745      |2008-06-21|      20.40|      10.00|2008-08-21|           7|
|     1010|2008-06-17|         115|deliver 776 King ...|      n|429Q      |2008-06-29|      40.60|      12.30|2008-08-22|          12|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
only showing top 10 rows
We have 23 orders

time_to_ship 列の注文番号 1006 が null になっています。これは、まだ注文が出荷されていないためです。出荷までの所要時間を分析すると、これが Amazon Prime でないことは明らかにわかります!

次に、null 値を取り除く必要がありますが、データフレームは不変です。つまり、データを変更することはできません。この問題を回避するには、新しいデータフレームを作成して、不要な値を除外するという方法をとる必要があります。データフレームは SQL テーブルではないので、DELETE FROM は使えません。

 Dataset<Row> ordersDf2 = ordersDf.filter(
        "time_to_ship IS NOT NULL");
    ordersDf2.printSchema();
    ordersDf2.show(5);
    System.out.println("We have " + ordersDf2.count()
        + " delivered orders");

  }
}

出力は以下のようになります。

+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|     1001|2008-05-20|         104|express          ...|      n|B77836    |2008-06-01|      20.40|      10.00|2008-07-22|          12|
|     1002|2008-05-21|         101|PO on box; delive...|      n|9270      |2008-05-26|      50.60|      15.30|2008-06-03|           5|
|     1003|2008-05-22|         104|express          ...|      n|B77890    |2008-05-23|      35.60|      10.80|2008-06-14|           1|
|     1004|2008-05-22|         106|ring bell twice  ...|      y|8006      |2008-05-30|      95.80|      19.20|      null|           8|
|     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|          16|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
only showing top 5 rows
We have 22 delivered orders

実際にはどのように処理されたのでしょうか?

Spark は、遅延評価の概念に基づいています。私が考えるに、遅延評価は、子供に対して部屋を片付けなさい、洗濯物を洗濯籠に入れておきなさい、本を棚に戻しなさいなどと指示するようなものです。通常は、(場合によっては、声を少々低くして)「今すぐやりなさい」と指示するまで子供は動きません。子供は行動を開始する合図として、「今すぐ」という言葉を待っているのです。Spark にもまさに同じことが当てはまります。

Spark の場合、この合図を送る親の役割を果たすのは、Catalyst です。Catalyst は最適化ツールであり、変換を実行するのに最適な方法を見つけます。これも例えて言うなら、子供が本を棚に戻すには、その前に、棚にとりあえず置いてある衣服を片付けなければならないことと同じです。バージョン 2.2 で、Spark はコスト・ベースの最適化ツールを Catalyst に追加しました。この追加は、IBM による重要な貢献です。

データフレームとは何か?

Apache Spark でのデータフレームは、リレーショナル・データベースの分野でのテーブルとほぼ同じです。ただし、「ほぼ」という言葉を欠かすことはできません。

  • データは不変であり、変更されることはありません。SQL を使用する場合、UPDATE を使用して簡単にデータを更新したり、INSERT を使用して簡単にテーブル内に新しい行を挿入したりできます。けれどもデータフレームはそれとは異なり、更新や挿入のアクションを実行するには、そのたびに新しいデータフレームを作成する必要があります。この作業は頻繁に行うことになる作業です。
  • データフレームでは、テーブルと同じようにメタデータを使用します。列には名前と型のプロパティー、そして値が必須であるかどうかを指定する null 可能プロパティーがあります。一方、テーブルとは異なり、データフレームにインデックス、外部キー、複合キー、トリガーはありません。
  • Spark に備わっている分析に関する特性から、データフレームでは簡単に列を追加して、変換を実行することができます。これは、RDBMS テーブルを使用して行うのは難しい場合があります。

データフレーム API は、開発者が使用するものです。開発者は、Facebook にアクセスするよりも、Class Dataset ページにアクセスする回数のほうが多くなるはずです!この API から直接、列の情報にアクセスして、join や union などの処理を実行することができます (シリーズの第 3 回では、この API について詳しく見ていきます)。

Java では、データフレームは Dataset<行> として実装されます。ストレージはクラスター内のパーティションに依存します。Spark は下位レベルの API を提供しているため、再パーティション化して実行プランを理解することができますが、これについては、この記事の範囲外です。Spark 2 以降、ストレージの処理は、Java/Scala オブジェクトよりも効率的な Tungsten という名前のコンポーネントによって行われるようになっています。

図 2 に、API 実装と、Spark データフレームのストレージを示します。

API 実装と、Spark データフレームのストレージを示す図
API 実装と、Spark データフレームのストレージを示す図

今回学んだ内容

今回のチュートリアルで、Spark でデータを保管する方法と、保管されたデータをデータフレーム API で処理する方法について理解を深めていただけたことを願います。このチュートリアルでは、組み込み関数とそれらの関数をどこで見つけられるかについても説明しました。そして何よりも重要なことに、皆さんは今回、より複雑なアナリティクスと機会学習に取り組むための基礎知識を身につけました。このシリーズの次回のチュートリアルでは、その知識を実践に移します。

詳細を調べるには


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Information Management, Open source
ArticleID=1058173
ArticleTitle=InformixのデータをSparkにオフロードする, 第 2 回: 基本的なデータ分析
publish-date=02222018