Developing shared Spark batch applications

Write a Spark batch application that you can submit to a Spark context that shares RDDs with other Spark batch applications.

About this task

Follow these steps to write a Spark batch application that uses the sharable RDD API to share RDDs within a Spark context. You can use the shared RDD API in both Scala and Python.
Note: A sharable RDD API cannot be nested in another sharable RDD API. If your Spark batch application includes code similar to the following snippet:
sc.updateSharedRDD(sc.getSharedRDD(rddName).get.name, sc.getSharedRDD(rddName).get)
rewrite the code as follows:
val someRDD = sc.getSharedRDD(rddName).get
sc.updateSharedRDD(sc.getSharedRDD(rddName).get.name, someRDD)

Procedure

  • To write a Spark batch application in Scala that uses the sharable RDD API, use the SharedContextJob API to get the Spark context instance. You must override this trait’s “runJob” method, and place the business logic in the method.
    Use the following sample code to create a Spark batch application in Scala and call it (for example) sharedrddtest.jar:
    import org.apache.spark.deploy.contextpool.api._
    
    object SharedRDDTest extends SharedContextJob {
    
      override def runJob(sc: SharedSparkContext, args:Array[String]): Any = {
        var numPart = if (args.length > 0) args(0).toInt else 4
        var numSleep = if (args.length > 1) args(1).toInt else 5000
        val oriRdd = sc.parallelize(0 until numPart, numPart).map { i =>
          Thread.sleep(numSleep)
          i
        }
        // If the program is just a data user but not a data creator,
        // i.e., the program is designed to use RDDs shared by others,
        // use "getSharedRDD".
        // 
        // If the program is only a creator, i.e., it is designed to generate
        // RDDs for other to use, use "updateSharedRDD".
        //
        // If the program generates RDDs, and may use/reuse them in the future, or
        // in next submission, use "getOrElseCreateSharedRDD". It will 
        // try to get an existing shared RDD if one is available. If not, it will 
        // create one, using the given RDD as underlying RDD generator.
        
        // If an RDD is shared, its data block can be fetched directly,
        // and the RDD generator method will not be called.
        
        // In this example, the first submission will run for some time,
        // but the second submission will return the result immediately.
        val rdd = sc.getOrElseCreateSharedRDD("shareRDD1", oriRdd)
        rdd.count()
      }
    }
  • To write a Spark batch application in Python that uses the sharable RDD API, use the SharedContextJob API to get the Spark context instance. Instead of calling “new SparkContext()” and “SparkContext.stop()” in the Python program, you must wrap the original logic under a method named “run_job”.
    1. Use the following sample code to create a Spark batch application in Python and call it (for example) sharedrddtest.py:
      def run_job(sc, args):
          """
          Put your import statement here, instead of the head of the file.
          """
      
          from time import sleep
      
          def func(x):
              sleep(num_sleep)
              x
      
          num_sleep = 5
          num_part = 4
      
          if len(args) > 1:
              num_part = int(args[0])
          if len(args) > 2:
              num_sleep = int(args[1])
          rdd1 = sc.parallelize(xrange(0, num_part), num_part).map(func)
          # Reads a shared RDD from others, e.g. from previous Scala RDD
          rdd2 = sc.get_shared_rdd("shareRDD1")
          # Or, creates a new RDD
          rdd3 = sc.get_or_else_create_shared_rdd("shareRDD2", rdd1)
      
          # rdd2 does not take time to compute because it is already cached
          print(rdd2.count())
          # rdd3 will take some time to compute on first submission, 
          # but time to compute will be shorter on later submission
          print(rdd3.count())
    2. Enable support for Python applications to use the sharable RDD API, so that Scala and Python applications can reuse shared RDDs. By default, Python applications cannot use the sharable RDD API.

      To enable Python applications in an instance group, modify the Spark version configuration of the instance group to which the shared Spark batch application is submitted. Edit the Spark version configuration and set the SPARK_EGO_SHARED_PYTHON parameter to true. You can only enable Python applications in an instance group with certain Spark versions. Spark versions not supported: 1.5.2.. For more information, see Modifying instance groups.