Using a spatial join
You can use the Spark DataFrame API and the SpatialJoinScala
API to
implement spatial joins in your code.
A typical spatial join is an operation on two sets of data with the join being performed on a
given spatial feature. For example, you can join two data sets using a within
distance
operation on two geometry columns. Other forms of join can also be performed, such
as relationship-based joins that use functions such as ST_Contains, ST_Crosses, and ST_Touches.
The following example illustrates how to use the SpatialJoinScala
API. It
determines which poles (represented as points) hold up a set of wires (represented as line
strings).
polesDf
, which contains the data for the poleswiresDf
, which contains the data for the wires
val inputFileName = "../../sparksqlspatial/resources/data/poles.csv"
val points = sc.textFile(inputFileName).cache()
val fields = ArrayBuffer[StructField]()
fields += DataTypes.createStructField("polesid", DataTypes.IntegerType, true)
fields += DataTypes.createStructField("polesgeometry", new ST_Point, true)
val schema = DataTypes.createStructType(fields)
def poles_record2row(record:String):Row = {
val fields = record.split(",")
val reader = new WKTReader(AcceleratedGeometryFactoryEG.getInstance())
val p = reader.read(fields(1)).asInstanceOf[IPointEG]
val i = Integer.parseInt(fields(0))
val row = RowFactory.create(Int.box(i), p)
row
}
val rowRDD = points.map(r => poles_record2row(r))
val pointsDf = sqlCtx.createDataFrame(rowRDD, schema)
val wiresInputFileName = "../../sparksqlspatial/resources/data/wires.csv"
val wires = sc.textFile(wiresInputFileName).cache()
val wireFields = new ArrayList[StructField]()
wireFields.add(DataTypes.createStructField("wiresid", DataTypes.IntegerType, true))
wireFields.add(DataTypes.createStructField("wiresgeometry", new ST_Geometry, true))
val wireSchema = DataTypes.createStructType(wireFields)
def wires_record2row(record:String):Row = {
val fields = record.split("\"")
val reader = new WKTReader(AcceleratedGeometryFactoryEG.getInstance())
val geom = reader.read(fields(1)).asInstanceOf[IGeometryEG]
val id = Integer.parseInt(fields(0).replaceAll(",", ""))
val row = RowFactory.create(Int.box(id), geom)
row
}
val wiresRDD = wires.map(r => wires_record2row(r))
val wiresDf = sqlCtx.createDataFrame(wiresRDD, wireSchema)
SpatialJoinScala
API provides two types of join functions:- Fuzzy join, which can be used for pruning.
- Distance join, which requires more time and consumes more resources than a fuzzy join, but is more accurate (that is, the result will not contain any false positives).
Fuzzy join
val fuzzyJoin = SpatialJoinScala.fuzzyJoin(conf, pointsDf, wiresDf, geohashBitDepth, distance,
polesGeometryColumnName, polesPrimaryKeys, wiresGeometryColumnName, wiresPrimaryKeys, geohashColumnName)
System.out.println(fuzzyJoin.count())
conf
- The SparkSession for creating new data frames.
pointsDf
- The first data frame.
wiresDf
- The second data frame, which is joined with the first.
geohashBitDepth
- The bit depth of the geohash.
distance
- The maximum distance in meters between the two geometries to be considered. For a fuzzy join, some of the rows in the result table might correspond to geometries that are slightly farther apart than the specified distance. However, all geometries that are at or within that distance will be included. For a distance join, none of the rows in the result table will correspond to geometries that are farther apart than the specified distance.
polesGeometryColumnName
andwiresGeometryColumnName
- The names of the columns to be spatially joined. Both must be of data type ST_Geometry.
polesPrimaryKeys
andwiresPrimaryKeys
- The primary keys of the first and second data frames. These are used to uniquely identify each row in the data frames.
geohashColumnName
- The name of the temporary column that is to be created by the fuzzy join call. Specify a name that is different from the names of any of the columns of the input data frames. This column is for internal use only and does not appear in the result.
In the previous example, a fuzzy join across about 400,000 wires and 1.2 million poles results in about 49 million results at a bit depth of 30. This can be filtered even further if the bit depth is increased to 32, in which case the joined frame contains about 14 million results. Note that the increase in bit depth also increases the internal number of rows that are expanded, and that there is a tradeoff between the time required for calculation and the number of results returned.
Distance join
val distanceJoin = SpatialJoinScala.withinDistanceJoin(conf, pointsDf, wiresDf, geohashBitDepth, distance,
polesGeometryColumnName, polesPrimaryKeys, wiresGeometryColumnName, wiresPrimaryKeys, geohashColumnName, distanceColumnName)
System.out.println(distanceJoin.count())
distanceColumnName
- The name of the column for the computed actual separation distance, which is returned as an additional column in the output data frame.