Contents


Data analysis and performance with Spark

Comments

Spark is an up-and-coming big-data analytics solution developed for highly efficient cluster computing using in-memory processing. Its targeted usage models include those that incorporate iterative algorithms (that is, those that can benefit from keeping data in memory rather than pushing to a higher latency file system). Before you work through these exercises, be sure you fully understand the Spark approach for cluster computing and its differences from Hadoop. Read about the background and usage of Spark in the recent companion article, Spark, an alternative for fast data analytics.

Overview

These exercises give you practice in the following areas:

  • Installing and experimenting with the Scala language
  • Learning about Scala collections
  • Installing Spark and running your first job
  • Improving performance through multithreading
  • Improving performance through configuration

Prerequisites

This set of exercises requires some basic knowledge of Linux®, including the ability to install new applications. Knowledge of the Scala language is beneficial but not required. You must perform these exercises in order, as they illustrate the installation of the necessary software packages.

Exercise 1: Install and experiment with the Scala language

Begin by installing the Scala language. The process to install Scala will vary depending on your platform. In the worst case, you can download the source tree and perform a build and installation. Because Spark requires a later version of Scala than what is available through package managers, install from the source tree.

Once installed, start the Scala interpreter (demonstrated in the companion article, "Spark, an alternative for fast data analytics," in Related topics), try some of the examples (from Listings 1 through 3), and verify your results.

Exercise 2: Learn about Scala collections

An interesting feature of Scala is its collection library. A collection in Scala is a container of zero of more things, such as a list, set, or map. This concept is relevant to Spark, because its distributed data sets can be operated on just like a local collection. You can learn more about Scala collections in The Scala 2.8 Collections API. Peruse this reference to see how to create an array and a list collection.

Perform the following steps:

  1. Create an array of ints, and apply the reverse method to it to reverse its contents.
  2. Create a list of strings, and iterate them to print them out individually.

Exercise 3: Install Spark and run your first job

Download the latest version of Spark. The simplest way to get it is through git:

$ git clone git://github.com/mesos/spark.git

This command line results in a new subdirectory called ./spark.cd into this subdirectory. Now update and compile the distribution with the simple build tool (sbt):

$ sbt/sbt update compile

Doing so results in the download of several packages in addition to the compilation of a number of Scala sources. To finish configuration, go into the ./spark/conf subdirectory, rename the spark-env.sh-template to spark-env.sh, and add the following line:

export SCALA_HOME=/opt/scala-2.9.1.final

Remember also to add the SCALA_HOME/bin to your PATH.

Now that Spark is installed, run the SparkPi example program with one thread on the local host. Use the companion article as a guide to accomplish this task. (See the article, "Spark, an alternative for fast data analytics," in Related topics.)

Exercise 4: Improve performance with multithreading

This exercise explores the difference with multithreading and Spark. Using the SparkPi sample program, you can change the number of threads associated with a particular execution.

Using the reference article as a guide, experiment with the threads parameter in local context and note the difference in execution times.

Exercise 5: Improve performance through configuration

Spark supports several configuration elements that enable higher performance. Considering a clustered configuration of Spark with Mesos:

  • What configuration items (see ./conf) can help with performance, considering Spark's feature of in-memory processing?
  • Similarly (see the Spark FAQ link in Related topics), what system property can improve performance for dataset caching?

Exercise solutions

Some output may differ depending on your version of Scala and Spark.

Solution for Exercise 1

Perform an installation of Scala, and try some of the examples (illustrated in Listing 1). From the companion article, "Spark, an alternative for fast data analytics," in Related topics, you can see how Scala is installed from its distribution and made accessible. You can add the exports to your environment to make them persistent.

Listing 1. Installing and experimenting with Scala
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz
$ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt
$ export SCALA_HOME=/opt/scala-2.9.1.final
$ export PATH=$SCALA_HOME/bin:$PATH
$ echo $PATH
/opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
$ 
$ scala
Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

scala> :quit
$

Solution for Exercise 2

For these examples, use the Scala interpreter to verify your results. Listing 2 provides the solution for the array exercise.

Listing 2. Creating and reversing an array
scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> numbers.reverse
res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1)

scala>

Listing 3 provides the solution to the list exercise.

Listing 3. Creating and iterating a list of strings
scala> val animals = List("dog", "cat", "mouse")
animals: List[java.lang.String] = List(dog, cat, mouse)

scala> animals foreach println
dog
cat
mouse

scala> for (elem <- animals) println(elem)
dog
cat
mouse

scala>

Solution for Exercise 3

You execute the SparkPi test through the ./run command, specifying the application and host/slices. This task is performed in Listing 4.

Listing 4. Executing the SparkPi test locally
$ ./run spark.examples.SparkPi local
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.SparkContext: Starting job...
12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0
12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s
Pi is roughly 3.13768
$

Solution for Exercise 4

Executing the SparkPi test program with different numbers of threads is easily specified with the local (host) argument. The number specified is the number of threads to associate with the run. This of course runs differently depending on the number of hardware threads in your system. The solution in Listing 5 illustrates execution of one and two threads.

As shown, the first run with one thread requires 0.59 seconds, while the second with two threads finishes in 0.9 seconds. Your speed might vary.

Listing 5. Executing SparkPi with a different number of threads
$ ./run spark.examples.SparkPi local[1]
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.SparkContext: Starting job...
12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s
Pi is roughly 3.12736
$ ./run spark.examples.SparkPi local[2]
12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.SparkContext: Starting job...
12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s
Pi is roughly 3.14388
$

Note that instead of local, you could provide a Mesos master to connect to, which supports a cluster of nodes instead of multiple threads on a single node (the higher-performing option).

To figure out how may hardware threads (virtual CPUs) are available to you, run the following command line:

$ grep processor /proc/cpuinfo

Solution for Exercise 5

Although the Spark configuration file (./conf/spark-env.sh) defines the key elements of the environment, one option (SPARK_MEM) specifies how much memory to support for each node's Java™ Virtual Machine. Given Spark's focus on in-memory datasets, more memory will result in improved performance (workload dependent).

As defined in the Spark FAQ, some datasets might not fit entirely in memory. When this occurs, Spark will either recompute the partition that didn't fit (the default) or, if configured, cache the partition to disk. To spill the partition to disk instead of recomputing it, use the spark.DiskSpillingCache property.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Linux, Open source
ArticleID=793341
ArticleTitle=Data analysis and performance with Spark
publish-date=02142012