Enriching spatial data
You can use spatial functions to enrich data.
For example, the following example adds, to a list of coordinates, the name of the country to
which each belongs, then determine the traffic for each
country.
package com.ibm.research.st.spark.sql
import java.io.BufferedReader
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStreamReader
import java.util.ArrayList
import java.util.List
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.PairFunction
import org.apache.spark.broadcast.Broadcast
import scala.Tuple2
import com.ibm.research.st.STConstants
import com.ibm.research.st.STException
import com.ibm.research.st.algorithms.indexing.ISpatialIndexEG
import com.ibm.research.st.algorithms.indexing.tessellation.TessellationIndexEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IBoundingBoxEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IPointEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.BoundingBoxEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.PointEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.accelerate.AcceleratedGeometryFactoryEG
import com.ibm.research.st.io.wkt.WKTReader
@throws (classOf[IOException])
@throws (classOf[STException])
object CountryIndexing extends App {
// read in the country file and then create a spatial index, after which
// we will create a broadcast variable out of it
val bbox:IBoundingBoxEG = new BoundingBoxEG(-85, STConstants.MIN_LONGITUDE,
85, STConstants.MAX_LONGITUDE)
final val spatialIndex:ISpatialIndexEG[String] = new TessellationIndexEG[String](bbox, 100 * 1000)
val fileName = "../../sparksqlspatial/resources/data/world.json.fixed"
val fis:FileInputStream = new FileInputStream(fileName)
val br:BufferedReader = new BufferedReader(new InputStreamReader(fis))
val reader = new WKTReader(
AcceleratedGeometryFactoryEG.getInstance())
var numInserted = 0
var readLine = br.readLine()
while (readLine != null) {
val splitString = readLine.split("\\|")
val geom = reader.read(splitString(1)).asInstanceOf[IGeometryEG]
val inserted:Boolean = spatialIndex.put(geom, splitString(0))
if (inserted) {
numInserted = numInserted + 1
}
System.out.println("Processed " + splitString(0) + ", inserted: "
+ numInserted)
readLine = br.readLine()
}
br.close()
// distribute on spark
val conf = SparkSession
.builder
.master("local[4]")
.appName("TagByCountry")
.getOrCreate()
val sc = SparkContext.getOrCreate()
// create broadcasts for both the bounding boxes and the spatial indexes
final val spatialIndexBc:Broadcast[ISpatialIndexEG[String]] = sc
.broadcast(spatialIndex)
val p = ArrayBuffer[IPointEG]()
// usa
p += (new PointEG(40.5, -74.3))
// chile
p += (new PointEG(-40.5, -73.0))
// kgz
p += (new PointEG(40.5, 74.0))
// aussie
p += (new PointEG(-23.9, 132.3))
val points = sc.parallelize(p)
@throws (classOf[Exception])
def mapToPair(p: IPointEG):Tuple2[IPointEG, String] = {
val vals = spatialIndexBc.value.containing(p)
if (vals.size() == 0) {
return new Tuple2[IPointEG, String](p, "NULL")
} else {
return new Tuple2[IPointEG, String](p, vals.get(0))
}
}
val pCMap = points.map(pt => mapToPair(pt))
for (t <- pCMap.collect()) {
System.out.println(t._1.toString() + " CONTAINED IN " + t._2)
}
sc.stop()
}