目次


InformixのデータをSparkにオフロードする, 第 1 回

データの収集

データを別のデータ・ソースに対して利用する

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: InformixのデータをSparkにオフロードする, 第 1 回

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:InformixのデータをSparkにオフロードする, 第 1 回

このシリーズの続きに乞うご期待。

簡単な背景情報

最近は Spark に関する文献は溢れるほどあります。そこで、このチュートリアルで取り上げるもう 1 つの歴史ある製品、Informix について簡単に説明します。Informix は生まれながらのリレーショナル・データベース・システム (RDS) であり、1980 年に登場してから瞬く間に UNIX システム上の基準リレーショナル・データベース管理システム (RDBMS) になりました。IBM は 2 つの段階 (2001 年と 2005 年) をかけて Imformix 社を買収し、同社の名前を冠した Informix データベースを IBM の優れたデータ管理製品ポートフォリオに追加しました。幸い、一部の人々が言うように、その忠実で熱意あるユーザー・ベースのおかげで Informix は IBM ポートフォリオの中でも活気を失わず、さまざまなイノベーションを実現していきました。例えば、XML と JSON のサポート、NoSQL の統合などのイノベーションによって、初のエンタープライズ・ハイブリッド・データベースにもなっています。Informix はトランザクション・アプリケーションには理想的なデータベースであり、Walmart、Cisco、The Home Depot、DHL などの多くの企業で引き続き盛んに使われています。世界最大のスーパーマーケット・チェーンでも、オレンジを基調色とした定番のホーム・センターでも、買い物をすると必ず、そのトランザクションが現地の Informix データベース内に記録されます。各地にある Informix データベース内に保存されたデータは、定期的にアーカンソー州またはジョージア州にある本部に送られて統合されます。

ここ数年の間に Informix Warehouse Accelerator (IWA) によってメモリーのサポートが追加されたとは言え、世界的な傾向としては、異機種混合のサポートおよびデータ・レイク指向のアークテクチャーへの移行が進んでいます。そのようなアーキテクチャーは Apache Spark が得意とする領域なので、「Informix データベース内のデータを Spark にオフロードできないだろうか」と考えるのも当然でしょう。

このチュートリアルで、Informix からデータを収集する方法を学んでください。このシリーズの第 2 回では、他のデータ・ソースを追加して、それらのデータを分析する方法を説明します。

チュートリアルの手順に従うには、以下のものが必要です。

  • Spark 2.1.1
  • Informix 12.10.FC8
  • Java 1.8.0_60-b27
  • Informix JDBC driver 4.10.8.1
  • MacOS Sierra 10.12.5

注: すべてのコードは GitHub から入手できます。

データフレームに特定の顧客を取り込む

図 1. データフレームへの顧客の追加
データフレームに顧客を追加する方法を示す図
データフレームに顧客を追加する方法を示す図

この最初のセクションでは、stores_demo という周知のサンプル・データベースの顧客テーブルに接続する方法を説明します。

構文はかなり単純ですが、陥りやすい罠が 1 つあります。接続では DELIMIDENT=Y を指定して、SQL クエリーが正しく作成されるようにしてください。DELIMIDENT の詳細については、このリンク先のページを参照してください。

以下に示すコードは、BasicCustomerLoader.java から抜粋しています。

まず、ローカル・モード (またはクラスター) に接続して Spark セッションを作成します。

  SparkSession spark = SparkSession
      .builder()
      .appName("Stores Customer")
      .master("local")
      .getOrCreate();

テーブルからデータを読み取って、データフレーム内に保管します。

  Dataset<Row> df = spark
    .read()
    .format("jdbc")
    .option(
      "url",
      "jdbc:informix-sqli://[::1]:33378/stores_demo:IFXHOST=lo_informix1210;DELIMIDENT=Y")
    .option("dbtable", "customer")
    .option("user", "informix")
    .option("password", "in4mix")
    .load();
  df.printSchema();
  df.show(5);

スキーマは以下のとおりです。

 |-- customer_num: long (nullable = false)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)

このスキーマによるデータは以下のように表示されます (最初の 5 行だけを表示)。

+------------+---------------+---------------+--------------------+-------...
|customer_num|          fname|          lname|             company|       ...
+------------+---------------+---------------+--------------------+-------...
|         101|Ludwig         |Pauli          |All Sports Supplies |213 Ers…
|         102|Carole         |Sadler         |Sports Spot         |785 Gea…
|         103|Philip         |Currie         |Phil's Sports       |654 Pop…
|         104|Anthony        |Higgins        |Play Ball!          |East Sh…
|         105|Raymond        |Vector         |Los Altos Sports    |1899 La…
+------------+---------------+---------------+--------------------+-------...

データベース全体を Spark 内にダンプする

図 2. Spark 内へのデータベース全体のダンプ
データベース全体を Spark 内にダンプする方法を示す図
データベース全体を Spark 内にダンプする方法を示す図

JDBC Metadata API はすべてのテーブルをリストアップしてから、それらのテーブルを一度に 1 つずつ Spark にロードします。テーブルとビューだけをロードして、システム・テーブル、同義語、別名などは拒否します。

ただし、データをインポートすると、Informix 内に存在する一部の不透明なデータ型が Spark によって認識されません。この問題を回避するには、Infomix に固有のダイアレクトを作成する必要があります。JDBC ダイアレクトは、データが入力 (および出力) される際に Spark を支援するだけでなく、一部のパラメーターをセットアップする際にも役立ちます。

変換ダイアレクトを作成する

以下に示すコードは、net.jgp.labs.informix2spark.utils パッケージに含まれる InformixJdbcDialect.java から抜粋しています。

Spark と Scala から、いくつかのパッケージをインポートする必要があります。

import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;

import scala.Option;

dialect クラスは JdbcDialect を継承しますが、このチュートリアルでは継承されたメソッドの一部のみをオーバーライドすることにします。

public class InformixJdbcDialect extends JdbcDialect {

主要なメソッドは canHandle です。このメソッドは JDBC URL に基づいて、そのダイアレクトが使用するのに適切なものであるかどうかを判断します。この場合の判断基準は、URL が jdbc:informix-sqli で始まっているかどうかです。これは、Informix データベースを使用していることを明確に示す指標となります。

  @Override
  public boolean canHandle(String url) {
    return url.startsWith("jdbc:informix-sqli");
  }

2 番目のメソッドは getCatalystType です。このメソッドは、JDBC ドライバーから取得したデータ型に基づいて、Catalyst が理解するデータ型を返します。このメソッドから返されるリストに、stores_demo 内のすべてのデータ型が含まれています。アプリケーションが他にもデータ型を使用する場合は、このリストにそのデータ型を追加する必要があります。

@Override
  public Option<DataType> getCatalystType(int sqlType,
      String typeName, int size, MetadataBuilder md) {
    if (typeName.toLowerCase().compareTo("calendar") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "calendarpattern") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "se_metadata") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "sysbldsqltext") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().startsWith("timeseries")) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("st_point") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("tspartitiondesc_t") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    return Option.empty();
  }

このメソッドが返す Option は、Scala からのものであることに注意してください。

テーブルのリストを取得する

次は、すべてのテーブルを調べます。

以下に示すコードは、net.jgp.labs.informix2spark.l100 パッケージに含まれる DatabaseLoader.java から抜粋しています。読みやすくするために、例外処理の部分は削除しました。

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;

public class DatabaseLoader {

  private List<String> getTables(Connection connection) {
    List<String> tables = new ArrayList<>();

接続のメタデータを取得します。

    DatabaseMetaData md;
    md = connection.getMetaData();

そこからテーブルのクエリーを実行します。この構文により、すべてのテーブルが返されます。

    ResultSet rs;
    rs = md.getTables(null, null, "%", null);

これで、通常の結果セットと同じようにメタデータの結果セットを参照できます。

    while (rs.next()) {

テーブル名は 3 列目にあります。

      String tableName = rs.getString(3);

テーブルのタイプは 4 列目にあります。

      String tableType = rs.getString(4).toLowerCase();
      System.out.print("Table [" + tableName + "] ... ");

テーブルとビューだけを保持します。テーブルの他のタイプとしては、システム・テーブル、グローバル一時、ローカル一時、別名、同義語があります。

      if (tableType.compareTo("table") == 0
          || tableType.compareTo("view") == 0) {
        tables.add(tableName);
        System.out.println("is in (" + tableType + ").");
      } else {
        System.out.println("is out (" + tableType + ").");
      }
    }

    return tables;
  }
}

CPU サイクルとメモリーを使用する

必要なテーブルのリストを取得した後は、すべての作業を 1 つにまとめてデータフレームのマッピングを作成します。

以下に示すコードは、net.jgp.labs.informix2spark.l100 パッケージに含まれる DatabaseLoader.java から抜粋しています。読みやすくするために、例外処理と一部の状態テストの部分は削除しました。

  private void start() {

ローカル・モードで Spark に接続します。

    SparkSession spark = SparkSession
        .builder()
        .appName("Stores Data")
        .master("local")
        .getOrCreate();

ビルダーを使用したこの小さな構成オブジェクトが、接続の管理を容易にします。

    Config config = ConfigManager.getConfig(K.INFORMIX);
    Connection connection = config.getConnection();

すべてのテーブルを取得します。

    List<String> tables = getTables(connection);
    if (tables.isEmpty()) {
      return;
    }

ダイアレクトを定義して Spark 内で登録します。

    JdbcDialect dialect = new InformixJdbcDialect();
    JdbcDialects.registerDialect(dialect);

マッピングを作成します。このマッピングには、テーブル名とデータフレームによるインデックスを付けます。

    Map<String, Dataset<Row>> database = new HashMap<>();

すべてのテーブルを調べます。

    for (String table : tables) {
      System.out.print("Loading table [" + table
          + "] ... ");

毎回テーブル名を変えるという点を除き、前と同じ原則に従います。

    Dataset<Row> df = spark
      	.read()
  	    .format("jdbc")
  	    .option("url", config.getJdbcUrl())
  	    .option("dbtable", table)
  	    .option("user", config.getUser())
  	    .option("password", config.getPassword())
  	    .option("driver", config.getDriver())
  	    .load();
      database.put(table, df);
      System.out.println("done");
    }

無作為のテーブルを選び (以下の例では、state テーブル)、そのテーブルを分析して最初の 5 行を分析結果として出力します。

    System.out.println("We have " + database.size()
        + " table(s) in our database");
    Dataset<Row> df = database.get("state");

    df.printSchema();
    System.out.println("Number of rows in state: " + df
        .count());
    df.show(5);
  }

このプログラムを実行すると、以下のように出力されます。

Table [sysaggregates] ... is out (system table).
Table [sysams] ... is out (system table).
…
Table [call_type] ... is in (table).
Table [catalog] ... is in (table).
Table [classes] ... is in (table).
Table [cust_calls] ... is in (table).
Table [customer] ... is in (table).
Table [customer_ts_data] ... is in (table).
Table [employee] ... is in (table).
Table [ext_customer] ... is in (table).
Table [items] ... is in (table).
Table [manufact] ... is in (table).
Table [orders] ... is in (table).
Table [se_metadatatable] ... is in (table).
Table [se_views] ... is in (table).
…
Loading table [customer] ... done
Loading table [customer_ts_data] ... done
Loading table [employee] ... done
Loading table [ext_customer] ... done
Loading table [items] ... done
Loading table [manufact] ... done
Loading table [orders] ... done
Loading table [se_metadatatable] ... done
Loading table [se_views] ... done
Loading table [state] ... done
Loading table [stock] ... done
…
We have 45 table(s) in our database

root
 |-- code: string (nullable = true)
 |-- sname: string (nullable = true)

分析結果として「random」テーブルの最初の 5 行だけが示されます。

+----+---------------+
|code|          sname|
+----+---------------+
|  AK|Alaska         |
|  HI|Hawaii         |
|  CA|California     |
|  OR|Oregon         |
|  WA|Washington     |
+----+---------------+

次のステップ

これで、このデータに関してアナリティクスを行う準備ができましたが、それはまた別の学習目的となります。

このチュートリアルでは、標準的な Java コードと標準的な JDBC メソッドを使用しました。このコードは Informix を対象に設計されたものですが、ものの数分で、Db2 などの別の優れたデータベースに適用させることもできます。

現在 HCL に所属しながらも、Informx に関する私の変わった質問にいつも答えてくれた Pradeep Natarajan に感謝します。

詳細を調べるには


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Information Management, Open source
ArticleID=1058170
ArticleTitle=InformixのデータをSparkにオフロードする, 第 1 回: データの収集
publish-date=02222018