空間データのロード

空間データを分析のためにデータ・フレームにロードする最善の方法は、そのデータを含むファイルのタイプおよび内容によって異なります。

テキストが含まれる CSV ファイルからの空間データのロード

ファイル resources/data/hospitals.csv には、病院を表すデータが含まれています。位置データは、経度および緯度のストリングとして提供されます。これらは、単純な RDD から、Spark SQL の ST_Point にマップできます。
```Scala
// load the hospitals info and map it to a table
// define the file name to read from
val inputFileName = "../../sparksqlspatial/resources/data/hospitals.csv"

// define the known schema for the file		
val input_schema = StructType(List(
StructField("id", DataTypes.IntegerType, true), 
StructField("name", DataTypes.StringType, true),
StructField("city", DataTypes.StringType, true),
StructField("state", DataTypes.StringType, true),
StructField("lon", DataTypes.DoubleType, true),
StructField("lat", DataTypes.DoubleType, true)))

// read the csv file applying the schema		
val hospitals = spark.read
      .format("csv")
      .option("delimiter",",")
      .schema(input_schema)
      .csv(inputFileName)

// define a UDF to transform lat lon into ellipsoidal geometry point  
val toPointEG = udf((lat:Double,lon:Double) => {new PointEG(lat,lon).asInstanceOf[IPointEG]})
    
// add a column "location" to hold the ellipsoidal geometry derived from the lon and lat columns
import sqlCtx.implicits._
val hospitalsDf = hospitals.withColumn("location",toPointEG($"lat",$"lon"))

データ・フレームが作成された後、SQLContext で表を登録し、SQL コマンドを実行することができます。

WKT ストリングが含まれる CSV ファイルからのデータのロード

ファイル resources/data/hospitalsWithHeader.csv には、病院を表すデータが含まれています。

最初のステップでは csv ファイルを読み取り、WKT ストリングとして提供される地理的位置のストリング・タイプを推測します。
al inputFileName = "../../sparksqlspatial/resources/data/hospitalsWithHeader.csv"

val hospitals = spark.read
      .format("csv")
      .option("delimiter",",")
      .option("header","true")
      .option("inferSchema", "true")
      .csv(inputFileName)
      
hospitals.printSchema()
検出されたスキーマは以下のとおりです。
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- location: string (nullable = true)
位置データには、以下のような WKT ストリングが含まれています。
POINT (+0036.077843 -0086.721939)
2 番目のステップでは、WKT ストリングを含むストリング・タイプを地理タイプに変換します。
val geo_schema = SpatialUtils.inferGeoSchema(hospitals)
geo_schema.printTreeString
地理スキーマは以下のとおりです。
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- location: com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG (nullable = true)
新しいスキーマをデータに適用する (分析でさらに使用するために、WKT ストリングを地理データに変換する) には、以下のようにします。
try {
      val hospitalsRdd = hospitals.rdd.map(r => SpatialUtils.applyGeoSchema(r,geo_schema))
      val df_result = sqlCtx.createDataFrame(hospitalsRdd, geo_schema)
}
catch {
    case _: Throwable => {
            System.out.print("Found geo data that do not match WKT syntax.\n") 
    }
}
位置データが WKT ストリングから楕円体の形状の地理的なフォーマットに変換されます。
PointEG: lat=-86.721939, long=36.077843