Enriching spatial data

You can use spatial functions to enrich data.

For example, the following example adds, to a list of coordinates, the name of the country to which each belongs, then determine the traffic for each country.
package com.ibm.research.st.spark.sql

import java.io.BufferedReader
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStreamReader
import java.util.ArrayList
import java.util.List

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.PairFunction
import org.apache.spark.broadcast.Broadcast

import scala.Tuple2

import com.ibm.research.st.STConstants
import com.ibm.research.st.STException
import com.ibm.research.st.algorithms.indexing.ISpatialIndexEG
import com.ibm.research.st.algorithms.indexing.tessellation.TessellationIndexEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IBoundingBoxEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IGeometryEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.IPointEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.BoundingBoxEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.PointEG
import com.ibm.research.st.datamodel.geometry.ellipsoidal.impl.accelerate.AcceleratedGeometryFactoryEG
import com.ibm.research.st.io.wkt.WKTReader

@throws (classOf[IOException]) 
@throws (classOf[STException])
object CountryIndexing extends App {

	// read in the country file and then create a spatial index, after which
	// we will create a broadcast variable out of it
	val bbox:IBoundingBoxEG = new BoundingBoxEG(-85, STConstants.MIN_LONGITUDE,
				85, STConstants.MAX_LONGITUDE)
	final val spatialIndex:ISpatialIndexEG[String]  = new TessellationIndexEG[String](bbox, 100 * 1000)

	val fileName = "../../sparksqlspatial/resources/data/world.json.fixed"
	val fis:FileInputStream = new FileInputStream(fileName)
	val br:BufferedReader = new BufferedReader(new InputStreamReader(fis))

	val reader = new WKTReader(
				AcceleratedGeometryFactoryEG.getInstance())

	var numInserted = 0
	var readLine = br.readLine()
	while (readLine != null) {
			val splitString = readLine.split("\\|")
			val geom = reader.read(splitString(1)).asInstanceOf[IGeometryEG]
			val inserted:Boolean = spatialIndex.put(geom, splitString(0))
			
			if (inserted) {
				numInserted = numInserted + 1
			}
			System.out.println("Processed " + splitString(0) + ", inserted: "
					+ numInserted)
			readLine = br.readLine()
		}

		br.close()

		// distribute on spark
		val conf = SparkSession
      .builder
      .master("local[4]")
      .appName("TagByCountry")
      .getOrCreate()

		val sc = SparkContext.getOrCreate()

		// create broadcasts for both the bounding boxes and the spatial indexes
		final val spatialIndexBc:Broadcast[ISpatialIndexEG[String]] = sc
				.broadcast(spatialIndex)

		val p = ArrayBuffer[IPointEG]()
		// usa
		p += (new PointEG(40.5, -74.3))
		// chile
		p += (new PointEG(-40.5, -73.0))
		// kgz
		p += (new PointEG(40.5, 74.0))
		// aussie
		p += (new PointEG(-23.9, 132.3))

		val points  = sc.parallelize(p)

		@throws (classOf[Exception])
		def mapToPair(p: IPointEG):Tuple2[IPointEG, String] = {
		  val vals = spatialIndexBc.value.containing(p)
			if (vals.size() == 0) {
				return new Tuple2[IPointEG, String](p, "NULL")
			} else {
				return new Tuple2[IPointEG, String](p, vals.get(0))
			}
		}
		 
		val pCMap = points.map(pt => mapToPair(pt))

		for (t <- pCMap.collect()) {
			System.out.println(t._1.toString() + " CONTAINED IN " + t._2)
		}

		sc.stop()
}