Loading spatial data
The best way to load spatial data into a data frame for analysis depends on the type and contents of the file containing that data.
Loading spatial data from a CSV file that contains text
The file
resources/data/hospitals.csv
contains data that represents hospitals.
The location data is provided as longitude and latitude strings. These can be mapped to a
ST_Point
in the Spark SQL world from a simple
RDD.```Scala
// load the hospitals info and map it to a table
// define the file name to read from
val inputFileName = "../../sparksqlspatial/resources/data/hospitals.csv"
// define the known schema for the file
val input_schema = StructType(List(
StructField("id", DataTypes.IntegerType, true),
StructField("name", DataTypes.StringType, true),
StructField("city", DataTypes.StringType, true),
StructField("state", DataTypes.StringType, true),
StructField("lon", DataTypes.DoubleType, true),
StructField("lat", DataTypes.DoubleType, true)))
// read the csv file applying the schema
val hospitals = spark.read
.format("csv")
.option("delimiter",",")
.schema(input_schema)
.csv(inputFileName)
// define a UDF to transform lat lon into ellipsoidal geometry point
val toPointEG = udf((lat:Double,lon:Double) => {new PointEG(lat,lon).asInstanceOf[IPointEG]})
// add a column "location" to hold the ellipsoidal geometry derived from the lon and lat columns
import sqlCtx.implicits._
val hospitalsDf = hospitals.withColumn("location",toPointEG($"lat",$"lon"))
After the data frame has been created, you can register a table in the
SQLContext
and execute SQL commands.
Loading data from a CSV file that contains WKT strings
The file resources/data/hospitalsWithHeader.csv
contains data that represents
hospitals.
The first step reads a csv file and infer a string type for the geo location that comes as WKT
string.
al inputFileName = "../../sparksqlspatial/resources/data/hospitalsWithHeader.csv"
val hospitals = spark.read
.format("csv")
.option("delimiter",",")
.option("header","true")
.option("inferSchema", "true")
.csv(inputFileName)
hospitals.printSchema()
The detected schema
is:
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- location: string (nullable = true)
The location data contain WKT strings
like:
POINT (+0036.077843 -0086.721939)
The second step transforms the string types that carry WKT strings into geo
types.
val geo_schema = SpatialUtils.inferGeoSchema(hospitals)
geo_schema.printTreeString
The geo schema
is:
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- location: com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG (nullable = true)
To apply the new schema to the data (i.e. convert the WKT strings into geo data for further use
in
analysis):
try {
val hospitalsRdd = hospitals.rdd.map(r => SpatialUtils.applyGeoSchema(r,geo_schema))
val df_result = sqlCtx.createDataFrame(hospitalsRdd, geo_schema)
}
catch {
case _: Throwable => {
System.out.print("Found geo data that do not match WKT syntax.\n")
}
}
The location data are transformed from WKT string into geo format in ellipsoidal
geometry:
PointEG: lat=-86.721939, long=36.077843