Contents
- Introduction
- Overview
- Exercise 1: Install and experiment with the Scala language
- Exercise 2: Learn about Scala collections
- Exercise 3: Install Spark and run your first job
- Exercise 4: Improve performance with multithreading
- Exercise 5: Improve performance through configuration
- Exercise solutions
- Downloadable resources
- Related topics
- Comments
Data analysis and performance with Spark
Spark is an up-and-coming big-data analytics solution developed for highly efficient cluster computing using in-memory processing. Its targeted usage models include those that incorporate iterative algorithms (that is, those that can benefit from keeping data in memory rather than pushing to a higher latency file system). Before you work through these exercises, be sure you fully understand the Spark approach for cluster computing and its differences from Hadoop. Read about the background and usage of Spark in the recent companion article, Spark, an alternative for fast data analytics.
Overview
These exercises give you practice in the following areas:
- Installing and experimenting with the Scala language
- Learning about Scala collections
- Installing Spark and running your first job
- Improving performance through multithreading
- Improving performance through configuration
Prerequisites
This set of exercises requires some basic knowledge of Linux®, including the ability to install new applications. Knowledge of the Scala language is beneficial but not required. You must perform these exercises in order, as they illustrate the installation of the necessary software packages.
Exercise 1: Install and experiment with the Scala language
Begin by installing the Scala language. The process to install Scala will vary depending on your platform. In the worst case, you can download the source tree and perform a build and installation.
Once installed, start the Scala interpreter (demonstrated in the companion article, "Spark, an alternative for fast data analytics," in Related topics), try some of the examples (from Listings 1 through 3), and verify your results.
Exercise 2: Learn about Scala collections
An interesting feature of Scala is its collection library. A collection in Scala is a container of zero of more things, such as a list, set, or map. This concept is relevant to Spark, because its distributed data sets can be operated on just like a local collection. You can learn more about Scala collections in The Scala 2.8 Collections API. Peruse this reference to see how to create an array and a list collection.
Perform the following steps:
- Create an array of
int
s, and apply thereverse
method to it to reverse its contents. - Create a list of strings, and iterate them to print them out individually.
Exercise 3: Install Spark and run your first job
Download the latest version of Spark. The simplest way to get it is through
git
:
$ git clone git://github.com/mesos/spark.git
This command line results in a new subdirectory called ./spark.cd
into this subdirectory. Now update and compile
the distribution with the simple build tool (sbt
):
$ sbt/sbt update compile
Doing so results in the download of several packages in addition to the compilation of a number of Scala sources. To finish configuration, go into the ./spark/conf subdirectory, rename the spark-env.sh-template to spark-env.sh, and add the following line:
export SCALA_HOME=/opt/scala-2.9.1.final
Remember also to add the SCALA_HOME/bin to your PATH.
Now that Spark is installed, run the SparkPi example program with one thread on the local host. Use the companion article as a guide to accomplish this task. (See the article, "Spark, an alternative for fast data analytics," in Related topics.)
Exercise 4: Improve performance with multithreading
This exercise explores the difference with multithreading and Spark. Using the SparkPi sample program, you can change the number of threads associated with a particular execution.
Using the reference article as a guide, experiment with the threads
parameter in local context and note the difference in execution times.
Exercise 5: Improve performance through configuration
Spark supports several configuration elements that enable higher performance. Considering a clustered configuration of Spark with Mesos:
- What configuration items (see ./conf) can help with performance, considering Spark's feature of in-memory processing?
- Similarly (see the Spark FAQ link in Related topics), what system property can improve performance for dataset caching?
Exercise solutions
Some output may differ depending on your version of Scala and Spark.
Solution for Exercise 1
Perform an installation of Scala, and try some of the examples (illustrated in Listing 1). From the companion article, "Spark, an alternative for fast data analytics," in Related topics, you can see how Scala is installed from its distribution and made accessible. You can add the exports to your environment to make them persistent.
Listing 1. Installing and experimenting with Scala
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz $ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt $ export SCALA_HOME=/opt/scala-2.9.1.final $ export PATH=$SCALA_HOME/bin:$PATH $ echo $PATH /opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin $ $ scala Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20). Type in expressions to have them evaluated. Type :help for more information. scala> def square(x: Int) = x*x square: (x: Int)Int scala> square(3) res0: Int = 9 scala> square(res0) res1: Int = 81 scala> :quit $
Solution for Exercise 2
For these examples, use the Scala interpreter to verify your results. Listing 2 provides the solution for the array exercise.
Listing 2. Creating and reversing an array
scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> numbers.reverse res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1) scala>
Listing 3 provides the solution to the list exercise.
Listing 3. Creating and iterating a list of strings
scala> val animals = List("dog", "cat", "mouse") animals: List[java.lang.String] = List(dog, cat, mouse) scala> animals foreach println dog cat mouse scala> for (elem <- animals) println(elem) dog cat mouse scala>
Solution for Exercise 3
You execute the SparkPi test through the ./run
command, specifying the application and host/slices. This task is performed
in Listing 4.
Listing 4. Executing the SparkPi test locally
$ ./run spark.examples.SparkPi local 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/23 20:55:33 INFO spark.SparkContext: Starting job... 12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List() 12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0 12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0 12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1 12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1 12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s Pi is roughly 3.13768 $
Solution for Exercise 4
Executing the SparkPi test program with different numbers of threads is easily
specified with the local
(host) argument. The
number specified is the number of threads to associate with the run. This
of course runs differently depending on the number of hardware threads in
your system. The solution in Listing 5 illustrates
execution of one and two threads.
As shown, the first run with one thread requires 0.59 seconds, while the second with two threads finishes in 0.9 seconds. Your speed might vary.
Listing 5. Executing SparkPi with a different number of threads
$ ./run spark.examples.SparkPi local[1] 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/24 18:50:41 INFO spark.SparkContext: Starting job... 12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List() 12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0 12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0 12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1 12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1 12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s Pi is roughly 3.12736 $ ./run spark.examples.SparkPi local[2] 12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/24 18:50:46 INFO spark.SparkContext: Starting job... 12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List() 12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1 12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1 12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s Pi is roughly 3.14388 $
Note that instead of local
, you could provide
a Mesos master to connect to, which supports a cluster of nodes instead
of multiple threads on a single node (the higher-performing option).
To figure out how may hardware threads (virtual CPUs) are available to you, run the following command line:
$ grep processor /proc/cpuinfo
Solution for Exercise 5
Although the Spark configuration file (./conf/spark-env.sh) defines the key
elements of the environment, one option (SPARK_MEM
)
specifies how much memory to support for each node's Java™ Virtual
Machine. Given Spark's focus on in-memory datasets, more memory will result
in improved performance (workload dependent).
As defined in the Spark FAQ, some datasets might not fit entirely in memory. When
this occurs, Spark will either recompute the partition that didn't fit (the
default) or, if configured, cache the partition to disk. To
spill the partition to disk instead of recomputing it, use the
spark.DiskSpillingCache
property.
Downloadable resources
Related topics
- Learn more about The Scala Programming Language on the project website.
- Find out about Spark Cluster Computing Frameworks on the project website.
- Read the author's article Spark, an alternative for fast data analytics on the background and usage of Spark. (developerWorks, November 2011).
- Peruse The Scala 2.8 Collections API for information on the Scala collections and how to create an array and a list collection. (Martin Odersky and Lex Spoon, September 2010).
- Visit the Spark FAQ for more information.
- In the developerWorks Linux zone, find hundreds of how-to articles and tutorials, as well as downloads, discussion forums, and a wealth of other resources for Linux developers and administrators.
- Evaluate IBM products in the way that suits you best: Download a product trial, try a product online, use a product in a cloud environment, or spend a few hours in the SOA Sandbox learning how to implement Service Oriented Architecture efficiently.
- Follow Tim on Twitter. You can also follow developerWorks on Twitter, or subscribe to a feed of Linux tweets on developerWorks.