目次


Bluemix 上の Spark を利用して、ブラウザー内で気象データを分析する

Comments

Apache Spark は、インメモリー処理機能を備えたオープンソースのクラスター・コンピューティング・フレームワークです。Spark を使用すれば、現在市場に出ている同等のテクノロジーに比べ、最大で 100 倍の速さで分析アプリケーションを実行することができます。

このチュートリアルでは、Bluemix 上で IPython Notebook を備えた Spark インスタンスを作成する方法と、Spark API を使用して気象データを分析する方法を説明します。

2015年 6月、IBM は、オープンソース Apache Spark プロジェクトを支援して IBM Bluemix クラウド・プラットフォーム上で Spark をクラウド・サービスとして利用可能にすることを発表しました。さらに、機械学習ライセンスの下、Spark コミュニティーに IBM SystemML ソフトウェアをリリースしました。これと併せ、IBM は Spark 関連のプロジェクトに従事する 3,500人以上の研究者と開発者を支援するとともに、100 万を超えるデータ・サイエンティストとデータ・エンジニアに Spark を学んでもらうという計画で、Spark テクノロジー・センターをサンフランシスコに開設する予定です。

このチュートリアルでは、Bluemix 上で IPython Notebook を備えた Spark インスタンスを作成する方法と、Spark API を使用して気象データを分析する方法を説明します。

このステップ・バイ・ステップの例により、IBM Bluemix 上ではわずか数行の Python コードを作成するだけで簡単にデータの分析が可能になることを明らかにします。

このチュートリアルに従うために必要となるもの

  • Bluemix アカウント
  • Apache Spark の基礎知識 (あると役立ちますが、必須ではありません)
  • Python の知識 (あると役立ちますが、必須ではありません)

ステップ 1. Bluemix ダッシュボード上で Spark インスタンスを作成する

  1. Bluemix アカウントにログインします (または、無料トライアルにサインアップしてください)。
  2. Bluemix の「CATALOG (カタログ)」を開きます。
  3. 「Data and Analytics (データおよび分析)」セクションで、「Apache Spark」を選択します。
  4. 「Service name (サービス名)」フィールドに、作成するサービス・インスタンスの名前を指定します (例えば、「Apache Spark-Weather」など)。その他のフィールドはデフォルトのままにします。サービスをアプリケーションにバインドする必要はありません。
  5. 「CREATE (作成)」をクリックし、新規サービスのプロビジョニングが完了するまで待ちます。
  6. 右上隅にある「OPEN (開く)」をクリックします。Apache Spark-Weather ページのスクリーンショット
    Apache Spark-Weather ページのスクリーンショット
  7. Spark インスタンスのコンソールが表示されるので、新しく作成したサービスをクリックします。Spark 用のアナリティクス・ページのスクリーンショット
    Spark 用のアナリティクス・ページのスクリーンショット

ステップ 2. Spark インスタンス上に IPython Notebook を作成する

新しい IPython Notebook を作成するか、既存のノートブックをアップロードするか、または Bluemix に用意されているサンプルの 1 つを使用することができます。新しいノートブックを作成する場合は、以下の手順に従ってください。

  1. ページの左下にある「NEW NOTEBOOK (新規ノートブック)」をクリックします。
  2. 「Blank (ブランク)」タブで、新しく作成するノートブックの名前を「Name (名前)」に入力し、「Language (言語)」として「Python」を選択してから「CREATE NOTEBOOK (ノートブックの作成)」をクリックします。「Create Notebook (ノートブックの作成)」ページ
    「Create Notebook (ノートブックの作成)」ページ

ステップ 3. ノートブックにデータをアップロードする

  1. 米国国立環境情報センター (NCEI) のサイト (http://www.ncdc.noaa.gov/data-access/quick-links) にアクセスします。
  2. 2 番目に記載されている「Global Historical Climatology Network-Daily (GHCN-D)」リンクをクリックします。
  3. 「GHCN-Daily FTP Access」リンクをクリックします。
  4. 「by_year」フォルダーのリンクをクリックします。
  5. 一番下のほうにある「2015.cs.gz」リンクまでスクロールダウンし、リンクをクリックしてファイルをダウンロードします。
  6. 適切なユーティリティーを使用してローカル・プラットフォーム上に 2015.cs.gz ファイルを解凍します。解凍された .csv ファイルを任意のテキスト・エディターで開き、ファイルの最初の行として、以下の列見出しを追加します。
    STATION,DATE,METRIC,VALUE,C5,C6,C7,C8
  7. 2015.csv の上記の列には、測候所 ID、日付、収集されたメトリック (降水量、当日の最高気温と最低気温、観測時の気温、降雪量、積雪量など) が示されます。C5 列から C8 列に示されるのは、何らかの追加情報です。データのスクリーンショット
    データのスクリーンショット
  8. .csv ファイルをノートブックにアップロードします。ファイル全体が Object Storage にアップロードされるまで、数分かかります。データ・ソースを追加する画面のスクリーンショット
    データ・ソースを追加する画面のスクリーンショット

ステップ 4. RDD を作成する

次は、SparkContext を使用して、2015.csv ファイルから RDD を作成します。

  1. Object Storage 内に保管したファイルに SparkContext を使用してアクセスする前に、Hadoop 構成を設定します。Hadoop 構成は、以下の関数を使用して設定することができます。
    def: set_hadoop_config(credentials
    prefix = "fs.swift.service." + credentials['name']
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v2.0/tokens')
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
    hconf.set(prefix + ".tenant", credentials['project_id'])
    hconf.set(prefix + ".username", credentials['user_id'])
    hconf.set(prefix + ".password", credentials['password'])
    hconf.setInt(prefix + ".http.port", 8080)
    hconf.set(prefix + ".region", credentials['region']
  2. ツールバーの「Insert (挿入)」ボタンをクリックしてセルを挿入し、そこに上記のコードを貼り付けます。その後、「Run (実行)」ボタンをクリックしてコードを実行します。ツールバーのスクリーンショット
    ツールバーのスクリーンショット
  3. コードが実行された後、「In」の隣に括弧で囲まれた数字が表示されます。この数字は、ノートブック全体のコード・セルの実行順序を示します。この例では、[1] になっています。関数のスクリーンショット
    関数のスクリーンショット
  4. 「Insert to code (コードに挿入)」をクリックして新しいコード・セルを作成します。新しく作成されるセル内には、.csv ファイルの資格情報が自動的に貼り付けられます。資格情報名 keystone が設定されているコードのスクリーンショット
    資格情報名 keystone が設定されているコードのスクリーンショット
  5. 提供された資格情報を後で使用できるよう、ドキュメントにコピーします。コードが作成されて、Object Storage にアクセスするために必要なエントリーが入力された状態になりました。後は、credentials の行を貼り付けてセルを実行するだけです。キー「'name'」には、任意のストリングを値として入力することができます。この例では、「'keystone'」という値になっています。
    credentials = {}
    credentials['name'] = 'keystone'
    credentials['auth_url'] = 'https://xxxx.xxxxxxx.xxx'
    credentials['project_id'] = 'xxxxxxxxxxx'
    credentials['region'] = 'xxxxxxxxxxxx'
    credentials['user_id'] = 'xxxxxxxxxxxxxx'
    credentials['password'] = 'xxxxxxxxxxxx'
  6. この時点で以下のコードを実行することで、Hadoop 構成を設定することができます。
        set_hadoop_config(credentials)
  7. ロー・データ・ファイルが Object Storage に保管されている状態で以下のコードを実行すると、ノートブック内に構成された SparkContext を使用してファイルにアクセスすることができます。
        weather = sc.textFile("swift://notebooks.keystone/2015.csv")
  8. 作成した RDD は、ロー・データ・ファイル内の個々の行に対応するストリングの集まりです。重要な点として、RDD は定義されていますが、まだインスタンス化されてはいないことに注意してください。RDD に対して count のようなアクションを適用すると、事実上、RDD が強制的にインスタンス化されます。
        print "Total records in the 2015.csv dataset:", weather.count()
  9. 最初のデータ行を確認するには、同じ RDD に対して以下のアクションを適用することができます。
        print "The first row in the 2015.csv dataset:", weather.first()

    以下の結果が出力されるはずです。

    コードと、コードの出力のスクリーンショット
    コードと、コードの出力のスクリーンショット

ステップ 5. データを解析して分析する

  1. データの処理を実際に開始するには、データを解析して列にする必要があります。それには、RDD に含まれる各行を、コンマで行を分割する関数にマッピングします。
        weatherParse = weather.map(lambda line : line.split(","))

    Python の lambda 表記は、名前にバインドされない匿名関数を作成するために使用されます。上記のコードでは、この概念を使用して、関数をパラメーターとしてマップ関数に渡しています。この匿名関数は、RDD weather から各行を受け取り、その行をコンマで区切って分離します。

    これにより、新しい RDD weatherParse はストリングのリストからなるリストとして定義されます。weatherParse に含まれる各リストは、weather 内の 1 つの行に対応しています。その行を構成する個々の要素は、各リストに含まれるストリングで表されます。

  2. 最初のリストを簡単に表示するには、以下の関数を使用します。
        weatherParse.first()
  3. ここで、この最初のリストの個々の要素を見てみましょう。このリストでは、最初のエントリーがオフセット 0 で始まっています。
        weatherParse.first()[0]
  4. インデックスを基準に他の要素をプルすることもできます。
        weatherParse.first()[2]
    入力と出力が表示された画面のスクリーンショット
    入力と出力が表示された画面のスクリーンショット
  5. 次は、データ・セットを絞り込むために、降水量のデータ値が含まれる行だけを選択します。つまり、METRIC 列が PRCP となっている行です。
  6. RDD weatherPrecp には、ペア (v1, v2) のリストが格納されています。ここで、v1 は測候所 ID、v2 はその測候所の 1 つの降水量データ・ポイント (1 日) です。表 1 に、この構造を示します。
    表 1.
    キー
    測候所 1値 1
    測候所 2値 2
    測候所 1値 3
    測候所 2値 4
    測候所 3値 5
    ... ...
  7. 続いて、このデータ・セットを新しいデータ・セットに変換 (マッピング) します。新しいデータ・セットでは、各行 (データのペア) に値 1 を増補します。表 2 に、この新しい構造を示します。
    表 2.
    キー
    測候所 1(値 1,1)
    測候所 2(値 2,1)
    測候所 1 (値 3,1)
    測候所 2(値 4,1)
    測候所 3(値 5,1)
    ... ...
  8. ここで、以下のコードを実行して weatherPrecpCountByKey を作成します。
    # x[0] is the station
    # x[3] is the precipitation value
    weatherPrecpCountByKey = weatherPrecp.map(lambda x : (x[0], (int(x[3]), 1)))
    weatherPrecpCountByKey.first()

    /li>

    表 2 は、単なる変換過程に過ぎません。表 2 のマッピングを行った理由は、データ・セットをさらに絞り込んで、表 3 に示す形式にするためです。

    表 3.
    キー
    測候所 1(値 1 + 値 3,2)
    測候所 2(値 2 + 値 4,2)
    測候所 3(値 5,1)
    ... ...

    上記の表を基に、値の合計を対応する値の数で除算することにより、各測候所の平均降水量を計算できるようになります。表 3 の形式にするには、Spark API の reduceByKey 関数を使用します。

    weatherPrecpAddByKey = weatherPrecpCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1]))
    weatherPrecpAddByKey.first()
  9. これでついに、各測候所の平均値を計算することができます。合計降水量を測定値の合計数で除算する関数によって weatherPrecpAddByKey RDD をマッピングすることで、weatherAverages RDD を作成します。
    weatherAverages = weatherPrecpAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) )
    weatherAverages.first()
    コードの In [11] と Out [11] から In [14] と Out [14] までのスクリーンショット
    コードの In [11] と Out [11] から In [14] と Out [14] までのスクリーンショット
  10. 以下のコードを実行すると、最初の 10 件の測候所とそれぞれの平均降水量を出力することができます。
    for pair in weatherAverages.top(10):
        print "Station %s had average precipitations of %f" % (pair[0],pair[1])
  11. 最初の 10 件の測候所と最大平均降水量を出力する場合は、測候所 ID と平均値ペアの順序を入れ替えます。そのために必要なことは、ペアの順序を入れ替えるマップ関数を使用することだけです。
    precTop10=[]
    stationsTop10=[]
    for pair in weatherAverages.map(lambda (x,y) : (y,x)).top(10):
        precTop10.append(pair[0])
        stationsTop10.append(pair[1])
        print "Station %s had average precipitations of %f" % (pair[1],pair[0])
  12. 対話型ノートブックを使用すると、結果を簡単に描画することができます。
    %matplotlib inline
    import numpy as np
    import matplotlib.pyplot as plt
    N = 10
    index = np.arange(N)  
    bar_width = 0.5
    plt.bar(index, precTop10, bar_width,
    color='b')
    plt.xlabel('Stations')
    plt.ylabel('Precipitations')
    plt.title('10 stations with the highest average precipitation')
    plt.xticks(index + bar_width, stationsTop10, rotation=90)
    plt.show()
最大平均降水量の棒グラフ
最大平均降水量の棒グラフ

まとめ

Apache Spark は、まったく新しい機能の数々をデータ・サイエンティスト、ビジネス・アナリスト、アプリケーション開発者に初めて利用できるようにする、次世代の分散データ処理エンジンです。Analytics for Apache Spark は IBM Bluemix に用意されている一般的なツールと連動することから、この Analytics for Apache Spark を使用することで、瞬く間にこの Apache Spark の能力をフル活用できるようになります。このチュートリアルでは、Spark API を使用する IPython Notebook を利用して、実際の気象に関するロー・データを分析する方法を紹介しました。この例をベースとして使用すれば、Bluemix 上の他のアナリティクスも簡単に利用することができます。


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=ビジネス・アナリティクス, Cloud computing
ArticleID=1034946
ArticleTitle=Bluemix 上の Spark を利用して、ブラウザー内で気象データを分析する
publish-date=07142016