Write a Spark batch application that you can submit to a Spark context that shares RDDs
with other Spark batch applications.
About this task
Follow these steps to write a Spark batch application that uses the sharable RDD API to
share RDDs within a Spark context. You can use the shared RDD API in both Scala and Python.Note: A
sharable RDD API cannot be nested in another sharable RDD API. If your Spark batch application
includes code similar to the following
snippet:
sc.updateSharedRDD(sc.getSharedRDD(rddName).get.name, sc.getSharedRDD(rddName).get)
rewrite
the code as
follows:
val someRDD = sc.getSharedRDD(rddName).get
sc.updateSharedRDD(sc.getSharedRDD(rddName).get.name, someRDD)
Procedure
-
To write a Spark batch application in Scala that uses the sharable RDD API, use the
SharedContextJob API to get the Spark context instance. You must override this
trait’s “runJob” method, and place the business logic in the method.
Use the following sample code to create a Spark batch application in Scala and call it (for
example)
sharedrddtest.jar:
import org.apache.spark.deploy.contextpool.api._
object SharedRDDTest extends SharedContextJob {
override def runJob(sc: SharedSparkContext, args:Array[String]): Any = {
var numPart = if (args.length > 0) args(0).toInt else 4
var numSleep = if (args.length > 1) args(1).toInt else 5000
val oriRdd = sc.parallelize(0 until numPart, numPart).map { i =>
Thread.sleep(numSleep)
i
}
// If the program is just a data user but not a data creator,
// i.e., the program is designed to use RDDs shared by others,
// use "getSharedRDD".
//
// If the program is only a creator, i.e., it is designed to generate
// RDDs for other to use, use "updateSharedRDD".
//
// If the program generates RDDs, and may use/reuse them in the future, or
// in next submission, use "getOrElseCreateSharedRDD". It will
// try to get an existing shared RDD if one is available. If not, it will
// create one, using the given RDD as underlying RDD generator.
// If an RDD is shared, its data block can be fetched directly,
// and the RDD generator method will not be called.
// In this example, the first submission will run for some time,
// but the second submission will return the result immediately.
val rdd = sc.getOrElseCreateSharedRDD("shareRDD1", oriRdd)
rdd.count()
}
}
-
To write a Spark batch application in Python that uses the sharable RDD API, use the
SharedContextJob API to get the Spark context instance. Instead of calling
“new SparkContext()” and “SparkContext.stop()” in the Python
program, you must wrap the original logic under a method named “run_job”.
-
Use the following sample code to create a Spark batch application in Python and call it (for
example) sharedrddtest.py:
def run_job(sc, args):
"""
Put your import statement here, instead of the head of the file.
"""
from time import sleep
def func(x):
sleep(num_sleep)
x
num_sleep = 5
num_part = 4
if len(args) > 1:
num_part = int(args[0])
if len(args) > 2:
num_sleep = int(args[1])
rdd1 = sc.parallelize(xrange(0, num_part), num_part).map(func)
# Reads a shared RDD from others, e.g. from previous Scala RDD
rdd2 = sc.get_shared_rdd("shareRDD1")
# Or, creates a new RDD
rdd3 = sc.get_or_else_create_shared_rdd("shareRDD2", rdd1)
# rdd2 does not take time to compute because it is already cached
print(rdd2.count())
# rdd3 will take some time to compute on first submission,
# but time to compute will be shorter on later submission
print(rdd3.count())
-
Enable support for Python applications to use the sharable RDD API, so that Scala and Python
applications can reuse shared RDDs. By default, Python applications cannot use the sharable RDD API.
To enable Python applications in an instance group, modify the Spark version
configuration of the instance group to
which the shared Spark batch application is submitted. Edit the Spark version configuration and set
the SPARK_EGO_SHARED_PYTHON parameter to true. You can
only enable Python applications in an instance group with certain Spark versions.
Spark versions not supported: 1.5.2.. For more
information, see Modifying instance groups.