Using a spatiotemporal join

You can use the Spark DataFrame API and the SpatiotemporalJoin API to implement spatiotemporal joins in your code.

A spatiotemporal join is an operation that joins two sets of data based on both spatial and temporal values. For example, you can join two data sets using a within distance operation on two geometry columns and a within time on columns that contain times or time ranges.

We will explain the SpatiotemporalJoin API using a simple example that combines taxi trips from New York City with a query dataset. The corresponding file is available in the resources/data/ directory (nyctrips-2013-01-01.csv).

The following code creates a data frame with the name taxidf that contains the data contained in the NYC taxi file:
//Create the fields and the schema for the data frame
var taxifields = ArrayBuffer[StructField]()
taxifields += DataTypes.createStructField("rowid", DataTypes.IntegerType, true)
taxifields += DataTypes.createStructField("pickuploc", new ST_Point, true)
taxifields += DataTypes.createStructField("dropoffloc", new ST_Point, true)
taxifields += DataTypes.createStructField("pickupts", DataTypes.LongType, true)
taxifields += DataTypes.createStructField("dropoffts", DataTypes.LongType, true)
taxifields += DataTypes.createStructField("numpass", DataTypes.IntegerType, true)
taxifields += DataTypes.createStructField("traveldistance", DataTypes.DoubleType, true)
taxifields += DataTypes.createStructField("traveltime", DataTypes.DoubleType, true)
val taxischema = DataTypes.createStructType(taxifields)

//Read in the text file
var lines = sc.textFile("../../sparksqlspatial/resources/data/nyctrips-2013-01-01.csv")

//Map the text file to RDD of Rows
  @throws (classOf[STException])
  def line2row(l:String):Row = {
	  
	  val splitString = l.split(",")
	  var o = ArrayBuffer[Any]()

		if (splitString.length < 14) {
			o +=  getNext()
			o +=  new PointEG(0, 0)
			o +=  new PointEG(0, 0)
			o +=  1000l
			o +=  1000l
			o +=  0
			o +=  0.0
			o +=  0.0
		} else {
			o +=  getNext()
			var pickupLoc:IPointEG = new PointEG(
			        java.lang.Double.parseDouble(splitString(11)), 
			        java.lang.Double.parseDouble(splitString(10)))
			if (pickupLoc.isValid()) {
				o += pickupLoc
			} else {
				o += new PointEG(0, 0)
			}
			var dropoffLoc:IPointEG = new PointEG(
			    java.lang.Double.parseDouble(splitString(13)), 
			    java.lang.Double.parseDouble(splitString(12)))
			if (dropoffLoc.isValid()) {
				o += dropoffLoc
			} else {
				o += new PointEG(0, 0)
			}
			o += parseTime(splitString(5))
			o += parseTime(splitString(6))
			o += Integer.parseInt(splitString(7))
			o += java.lang.Double.parseDouble(splitString(8))
			o += java.lang.Double.parseDouble(splitString(9))
		}							
    val row = Row.fromSeq(o.toSeq)
    row
  }

val rows = lines.map(r => line2row(r))

//Create the data frame
val taxidf = spark.createDataFrame(rows, taxischema)
The following code contains a simple query in the form a data frame, and then show how you can achieve a join on this. Spatiotemporal join provides two APIs - a fuzzy join for quick results, and then a more exact join that provides an exact query.
//Create the schema for query
var queryFields = ArrayBuffer[StructField]()
queryFields += DataTypes.createStructField("queryid", DataTypes.IntegerType, true)
queryFields += DataTypes.createStructField("queryGeom", new ST_Geometry, true)
queryFields += DataTypes.createStructField("queryTsStart", DataTypes.LongType, true)
queryFields += DataTypes.createStructField("queryTsEnd", DataTypes.LongType, true)
val querySchema = DataTypes.createStructType(queryFields)

var qo = ArrayBuffer[Any]()
qo += getNext()
qo += new PointEG(40.750936, -73.977105)
qo += parseTime("2013-01-01 08:00:00")
qo += parseTime("2013-01-01 10:00:00")
val queryRow = Row.fromSeq(qo.toSeq)
		
var queryRows = new ArrayList[Row]()
queryRows.add(queryRow)
    
val queryRDD = sc.makeRDD(queryRows)

//query data frame
val queryDf = sqlCtx.createDataFrame(queryRDD, querySchema)
The function call for a spatiotemporal join specifies the following parameters:
spark
The SparkSession for creating new data frames.
df1
The first data frame.
df2
The second data frame (will be joined with the first).
locationBitDepth
The bit depth of the geohash (the choice of a bit depth is a matter of another article, which will provide an in-depth discussion).
timeBitDepth
The bit depth of the time range encoding (the choice of the bit depth is a matter of another article, which will provide an in-depth discussion).
df1GeometryColumName and df2GeometryColumnName
The column names of the geometries (of type ST_Geometry) of df1 and df2 data frames, which will be joined.
df1StartTimeColumnName and df1EndTimeColumnName
The start and end times for that geometry observation, it is possible for the end time column name to be null (i.e., it is non-existent). A similar argument applies to df2 as well.
df1PrimaryKeys and df2PrimaryKeys
The primary keys for the first and the second data frames, respectively.
geohashColumnName
The name of the temporary column that will be created by the call to fuzzyJoin, which will be dropped in the result. This will ensure that the temporary column name is not conflicting with the other column names of the data frames.
timehashColumnName
This is similar to the geohashColumnName, this is the name of the temporary time hash column name that will be created by fuzzyJoin and will be dropped in the result.
locationDistance
The maximum allowable (minimum) distance between two given geometries (df1.geom.distance(df2.geom)).
timeDistance
The maximum allowable (minimum) distance between the time ranges.
locationDistanceColumnName
The return value that has the distance between the two geometries in the joined result.
timeDistanceColumnName
The return value that has the time distance between the two time ranges in the joined result. The time distance is the shortest time difference between two ranges. In other words, given a range [s1, e1] and [s2, e2], if the time ranges overlap, the distance is zero; otherwise, it is the minimum distance between the two ranges:
d(s1, e1, s2, e2) = Min |t2-t1| over all t1, t2 
such that s1 <= t1 <= e1 and s2 <= t2 <= e2
The following example illustrates a call to an exact join. The join is across about 450,000 taxi trips (from a single day) and two hour time range.
val joinedDf = SpatiotemporalJoinScala.join(spark, taxidf, queryDf,
		28, radius, 46, 5 * 60 * 1000, "pickuploc", "pickupts", null,
		df1PrimaryKeys, "queryGeom", "queryTsStart", "queryTsEnd",
		df2PrimaryKeys, "geohash", "distance", "timehash", "timedist")