空間データのロード
空間データを分析のためにデータ・フレームにロードする最善の方法は、そのデータを含むファイルのタイプおよび内容によって異なります。
テキストが含まれる CSV ファイルからの空間データのロード
ファイル
resources/data/hospitals.csv には、病院を表すデータが含まれています。位置データは、経度および緯度のストリングとして提供されます。これらは、単純な RDD から、Spark SQL の ST_Point にマップできます。```Scala
// load the hospitals info and map it to a table
// define the file name to read from
val inputFileName = "../../sparksqlspatial/resources/data/hospitals.csv"
// define the known schema for the file
val input_schema = StructType(List(
StructField("id", DataTypes.IntegerType, true),
StructField("name", DataTypes.StringType, true),
StructField("city", DataTypes.StringType, true),
StructField("state", DataTypes.StringType, true),
StructField("lon", DataTypes.DoubleType, true),
StructField("lat", DataTypes.DoubleType, true)))
// read the csv file applying the schema
val hospitals = spark.read
.format("csv")
.option("delimiter",",")
.schema(input_schema)
.csv(inputFileName)
// define a UDF to transform lat lon into ellipsoidal geometry point
val toPointEG = udf((lat:Double,lon:Double) => {new PointEG(lat,lon).asInstanceOf[IPointEG]})
// add a column "location" to hold the ellipsoidal geometry derived from the lon and lat columns
import sqlCtx.implicits._
val hospitalsDf = hospitals.withColumn("location",toPointEG($"lat",$"lon"))データ・フレームが作成された後、SQLContext で表を登録し、SQL コマンドを実行することができます。
WKT ストリングが含まれる CSV ファイルからのデータのロード
ファイル resources/data/hospitalsWithHeader.csv には、病院を表すデータが含まれています。
最初のステップでは csv ファイルを読み取り、WKT ストリングとして提供される地理的位置のストリング・タイプを推測します。
al inputFileName = "../../sparksqlspatial/resources/data/hospitalsWithHeader.csv"
val hospitals = spark.read
.format("csv")
.option("delimiter",",")
.option("header","true")
.option("inferSchema", "true")
.csv(inputFileName)
hospitals.printSchema()検出されたスキーマは以下のとおりです。
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- location: string (nullable = true)位置データには、以下のような WKT ストリングが含まれています。
POINT (+0036.077843 -0086.721939)2 番目のステップでは、WKT ストリングを含むストリング・タイプを地理タイプに変換します。
val geo_schema = SpatialUtils.inferGeoSchema(hospitals)
geo_schema.printTreeString地理スキーマは以下のとおりです。
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- location: com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG (nullable = true)新しいスキーマをデータに適用する (分析でさらに使用するために、WKT ストリングを地理データに変換する) には、以下のようにします。
try {
val hospitalsRdd = hospitals.rdd.map(r => SpatialUtils.applyGeoSchema(r,geo_schema))
val df_result = sqlCtx.createDataFrame(hospitalsRdd, geo_schema)
}
catch {
case _: Throwable => {
System.out.print("Found geo data that do not match WKT syntax.\n")
}
}位置データが WKT ストリングから楕円体の形状の地理的なフォーマットに変換されます。
PointEG: lat=-86.721939, long=36.077843