Using a spatial join

You can use the Spark DataFrame API and the SpatialJoinScala API to implement spatial joins in your code.

A typical spatial join is an operation on two sets of data with the join being performed on a given spatial feature. For example, you can join two data sets using a within distance operation on two geometry columns. Other forms of join can also be performed, such as relationship-based joins that use functions such as ST_Contains, ST_Crosses, and ST_Touches.

The following example illustrates how to use the SpatialJoinScala API. It determines which poles (represented as points) hold up a set of wires (represented as line strings).

The following code creates two data frames:
  • polesDf, which contains the data for the poles
  • wiresDf, which contains the data for the wires
val inputFileName = "../../sparksqlspatial/resources/data/poles.csv"
val points = sc.textFile(inputFileName).cache()
	
val fields = ArrayBuffer[StructField]()
fields += DataTypes.createStructField("polesid", DataTypes.IntegerType, true)
fields += DataTypes.createStructField("polesgeometry", new ST_Point, true)
val schema = DataTypes.createStructType(fields)
  
  def poles_record2row(record:String):Row = {
    val fields = record.split(",")
    val reader = new WKTReader(AcceleratedGeometryFactoryEG.getInstance())
    val p = reader.read(fields(1)).asInstanceOf[IPointEG]
    val i = Integer.parseInt(fields(0))
    val row = RowFactory.create(Int.box(i), p)
    row
  }

val rowRDD = points.map(r => poles_record2row(r))
val pointsDf = sqlCtx.createDataFrame(rowRDD, schema)

val wiresInputFileName = "../../sparksqlspatial/resources/data/wires.csv"
val wires = sc.textFile(wiresInputFileName).cache()

val wireFields = new ArrayList[StructField]()
wireFields.add(DataTypes.createStructField("wiresid", DataTypes.IntegerType, true))
wireFields.add(DataTypes.createStructField("wiresgeometry", new ST_Geometry, true))
val wireSchema = DataTypes.createStructType(wireFields)
	
  def wires_record2row(record:String):Row = {
    val fields = record.split("\"")
    val reader = new WKTReader(AcceleratedGeometryFactoryEG.getInstance())
    val geom = reader.read(fields(1)).asInstanceOf[IGeometryEG]
    val id = Integer.parseInt(fields(0).replaceAll(",", ""))
    val row = RowFactory.create(Int.box(id), geom)
    row
  }

val wiresRDD = wires.map(r => wires_record2row(r))
val  wiresDf = sqlCtx.createDataFrame(wiresRDD, wireSchema)
The SpatialJoinScala API provides two types of join functions:
  • Fuzzy join, which can be used for pruning.
  • Distance join, which requires more time and consumes more resources than a fuzzy join, but is more accurate (that is, the result will not contain any false positives).

Fuzzy join

val fuzzyJoin = SpatialJoinScala.fuzzyJoin(conf, pointsDf, wiresDf, geohashBitDepth, distance, 
polesGeometryColumnName, polesPrimaryKeys, wiresGeometryColumnName, wiresPrimaryKeys, geohashColumnName)

System.out.println(fuzzyJoin.count())
The function call for a fuzzy join specifies the following parameters:
conf
The SparkSession for creating new data frames.
pointsDf
The first data frame.
wiresDf
The second data frame, which is joined with the first.
geohashBitDepth
The bit depth of the geohash.
distance
The maximum distance in meters between the two geometries to be considered. For a fuzzy join, some of the rows in the result table might correspond to geometries that are slightly farther apart than the specified distance. However, all geometries that are at or within that distance will be included. For a distance join, none of the rows in the result table will correspond to geometries that are farther apart than the specified distance.
polesGeometryColumnName and wiresGeometryColumnName
The names of the columns to be spatially joined. Both must be of data type ST_Geometry.
polesPrimaryKeys and wiresPrimaryKeys
The primary keys of the first and second data frames. These are used to uniquely identify each row in the data frames.
geohashColumnName
The name of the temporary column that is to be created by the fuzzy join call. Specify a name that is different from the names of any of the columns of the input data frames. This column is for internal use only and does not appear in the result.

In the previous example, a fuzzy join across about 400,000 wires and 1.2 million poles results in about 49 million results at a bit depth of 30. This can be filtered even further if the bit depth is increased to 32, in which case the joined frame contains about 14 million results. Note that the increase in bit depth also increases the internal number of rows that are expanded, and that there is a tradeoff between the time required for calculation and the number of results returned.

Distance join

val distanceJoin = SpatialJoinScala.withinDistanceJoin(conf, pointsDf, wiresDf, geohashBitDepth, distance, 
polesGeometryColumnName, polesPrimaryKeys, wiresGeometryColumnName, wiresPrimaryKeys, geohashColumnName, distanceColumnName)

System.out.println(distanceJoin.count())
The function call for a distance join specifies, in addition to the parameters for a fuzzy join, the following parameter:
distanceColumnName
The name of the column for the computed actual separation distance, which is returned as an additional column in the output data frame.