Loading spatial data

The best way to load spatial data into a data frame for analysis depends on the type and contents of the file containing that data.

Loading spatial data from a CSV file that contains text

The file resources/data/hospitals.csv contains data that represents hospitals. The location data is provided as longitude and latitude strings. These can be mapped to a ST_Point in the Spark SQL world from a simple RDD.
```Scala
// load the hospitals info and map it to a table
// define the file name to read from
val inputFileName = "../../sparksqlspatial/resources/data/hospitals.csv"

// define the known schema for the file		
val input_schema = StructType(List(
StructField("id", DataTypes.IntegerType, true), 
StructField("name", DataTypes.StringType, true),
StructField("city", DataTypes.StringType, true),
StructField("state", DataTypes.StringType, true),
StructField("lon", DataTypes.DoubleType, true),
StructField("lat", DataTypes.DoubleType, true)))

// read the csv file applying the schema		
val hospitals = spark.read
      .format("csv")
      .option("delimiter",",")
      .schema(input_schema)
      .csv(inputFileName)

// define a UDF to transform lat lon into ellipsoidal geometry point  
val toPointEG = udf((lat:Double,lon:Double) => {new PointEG(lat,lon).asInstanceOf[IPointEG]})
    
// add a column "location" to hold the ellipsoidal geometry derived from the lon and lat columns
import sqlCtx.implicits._
val hospitalsDf = hospitals.withColumn("location",toPointEG($"lat",$"lon"))

After the data frame has been created, you can register a table in the SQLContext and execute SQL commands.

Loading data from a CSV file that contains WKT strings

The file resources/data/hospitalsWithHeader.csv contains data that represents hospitals.

The first step reads a csv file and infer a string type for the geo location that comes as WKT string.
al inputFileName = "../../sparksqlspatial/resources/data/hospitalsWithHeader.csv"

val hospitals = spark.read
      .format("csv")
      .option("delimiter",",")
      .option("header","true")
      .option("inferSchema", "true")
      .csv(inputFileName)
      
hospitals.printSchema()
The detected schema is:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- location: string (nullable = true)
The location data contain WKT strings like:
POINT (+0036.077843 -0086.721939)
The second step transforms the string types that carry WKT strings into geo types.
val geo_schema = SpatialUtils.inferGeoSchema(hospitals)
geo_schema.printTreeString
The geo schema is:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- location: com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG (nullable = true)
To apply the new schema to the data (i.e. convert the WKT strings into geo data for further use in analysis):
try {
      val hospitalsRdd = hospitals.rdd.map(r => SpatialUtils.applyGeoSchema(r,geo_schema))
      val df_result = sqlCtx.createDataFrame(hospitalsRdd, geo_schema)
}
catch {
    case _: Throwable => {
            System.out.print("Found geo data that do not match WKT syntax.\n") 
    }
}
The location data are transformed from WKT string into geo format in ellipsoidal geometry:
PointEG: lat=-86.721939, long=36.077843