by Tin H To, and Robin Tanenbaum
IBM Open Data Analytics for z/OS (IzODA) Spark is a high-performance, general execution engine, designed to perform large-scale data processing with ease. Spark applications can utilize the memory and processor resources provided by the Spark cluster to perform complex data processing very efficiently.
There are many types of Spark applications, and each has its own set of characteristics and resource requirements. Spark, being a general execution engine, provides many different ways of tuning at the application level and at the environment level depending on application needs. Some of the most commonly asked questions regarding Spark tuning are:
- How many cores and how much memory do I need per executor?
- How many cores do I need for the application?
- How many partitions should I create for a given size of data to be processed?
In order to achieve optimal performance, those tuning considerations are critical.
It is common to have a Spark application read in a large amount of data from external data sources, such as a database server, perform a series of actions and transformations on the data, and output the result. The example used in this blog is a simple Spark application that does exactly that. We will look at how different tuning parameters impact its performance, and some of the best practices for this type of application.
The general workflow of this application, running on a Spark 2.2 cluster, is as follows:
- Read data from external data source via JDBC
- Perform data cleansing
- Perform data transformations (sort, group-by, UDF, Window function, etc.)
- Report result
Our application processes 3 sets of data:
- Small: 265 million rows
- Medium: 825 million rows
- Large: 2686 million rows
The data consists of a fact table (number of rows shown above) and several dimension tables; all reside in an IBM Db2® for z/OS® server.
Observations – Executor cores
Generally speaking, your application will run faster as the total number of executor cores increases, assuming the application has enough parallelism to fully utilize the cluster resources. However, perhaps unsurprisingly, the performance gain is not linear as the application performance is constrained by other aspects of the system (memory, I/O, overhead, etc.). For example, with our application we see this pattern:
The result depends on the application and the environment in which it runs. Therefore, it is important to determine the optimal number of total executor cores needed to enable your application to achieve good performance.
Regarding the number of cores per executor, the goals are:
- Have enough cores in each executor so every executor can run multiple tasks in parallel….
- …but not too many cores which may result in contention within the same executor JVM
With our application, we found performance increased nearly linearly as we assigned first 2, then 4, then 8 processors per executor. While performance continued to improve as more processors, beyond the original 8, were added, the additional benefit per processor decreased. Again, the result depends on the application and the environment in which it runs, and we highly encourage you to use this as a reference point and experiment with different numbers and configurations.
Observations – Executor memory
According to Apache, memory usage in Spark largely falls under one of two categories:
- Execution: For computation in shuffles, joins, sorts and aggregations
- Storage: For caching and propagating internal data across the cluster
Unlike the previous memory model (Spark 1.5 and before), execution and storage now share a unified memory region. By default the unified memory region size is 60% of (executor JVM heap size - 300MB), and it is configurable by changing spark.memory.fraction. The remaining 40% is used for Spark internal metadata, user data structure, etc. Either execution or storage can acquire all the available memory in the shared region. If necessary, execution may evict memory allocated for storage, but by default only until storage drops below 50% of the unified memory. Storage may not evict memory allocated for execution however. The eviction threshold is configured by changing spark.memory.storageFraction. You may modify these settings on a per-application basis.
spark.executor.memory = 20g
spark.memory.fraction = 0.6
spark.memory.fraction = 0.4
We use the default settings for our application as they are applicable to most workloads. Increasing the spark.memory.storageFraction value may boost the performance of your application if it performs data caching more heavily than aggregations. On the other hand, if your application performs a lot of aggregations while caching very little intermediate data, you may reduce the spark.memory.storageFraction value. (Source)
Our application uses Spark SQL to read data from Db2 into DataFrames. The DataFrame API incorporates various optimization techniques (optimized logical and physical query plan, more compact data format, etc.) to increase memory and speed efficiency.
With respect to increased memory efficiency, our experience with DataFrame is similar to Databrick’s in that it uses about 25% of the memory footprint compared to RDD’s when caching data. The gain may vary depending on the data. Conversely, if you need to convert DataFrames into RDDs in your application (for example, when you are working with an API that only accepts RDD as input), you should pay special attention to the memory usage (especially when caching the RDDs) because the converted RDDs might be significantly larger than their DataFrame counterparts.
When a Spark application runs out of executor memory, the outcome depends on what operation the application is trying to perform at that time. One of the most typical cases is the application fails with the “java.lang.OutOfMemoryError” error message. However, in some cases the application does not crash, but instead takes a much longer time to run. For instance, our application performs a lot of aggregations on the data while not being very demanding on cache-memory. When the execution memory is inadequate, we notice occurrences of shuffle spill to disk (as seen below); these spills negatively impact the performance of the application.
Heap size also impacts Java’s garbage collection behavior (source). A small heap may require frequent garbage collections to make space for the creation of new objects. A larger heap may reduce the frequency of collections, while increasing the pause time required for collections to complete. Experimentation is needed to find the optimal heap size for a particular application with its specific set of performance requirements.
You may use IBM’s Garbage Collection and Memory Visualizer (GCMV) tool to help understand garbage collection behavior during application execution. For more detailed information, see Section 5.4 of our Resource Tuning Recommendations for IBM z/OS Platform for Apache Spark article.
For our application, we get satisfactory results by using the following settings:
memory per executor
total memory for all executors
However, memory requirements highly depend on both the data and how the application is written. For instance, DataFrames with a higher number of columns require more memory than DataFrames with fewer columns for the same number of rows. Applications that cache data heavily require more memory as well (although you can take a performance hit and partially cache to disk instead). We strongly encourage you to use the numbers above only as a reference and experiment with different settings for your application and environment.
Observations – Number of Partitions
Spark derives its power from its ability to process a massive amount of data in parallel. In order to do so, the data needs to be divided into multiple partitions so it can be distributed among the executors and processed in parallel.
By default, without specifying the partition-related properties (partitionColumn, lowerBound, upperBound, numPartitions), Spark uses only one JDBC connection on a single executor to read data from the data source (Db2 in our case), and the resultant DataFrame will only have one partition. Not only will the resultant DataFrame not have enough partitions to achieve parallelism (unless repartitioned afterwards), it will not utilize multiple concurrent JDBC connections to parallelize the data pull.
val jdbcDFFact = spark.read.format("jdbc")
.option("url", url).option("user", username).option("password", password)
In the example code above, Spark reads the “TABLE_FACT” table into a DataFrame. We set numPartitions to 10 so that the resultant DataFrame will have 10 partitions, using the value in the “fact_id” column (partitionColumn) to determine which partition a given row should go, and lowerBound, upperBound to determine the value range for each partition:
- range = (upperBound – lowerBound) / numPartitions = 1000
The resultant partitions are logically equivalent to the following SQL queries:
- SELECT * FROM TABLE_FACT WHERE FACT_ID < 1000 OR FACT_ID = null
- SELECT * FROM TABLE_FACT WHERE FACT_ID >= 1000 AND FACT_ID < 2000
- SELECT * FROM TABLE_FACT WHERE FACT_ID >= 2000 AND FACT_ID < 3000
- SELECT * FROM TABLE_FACT WHERE FACT_ID >= 9000
Notice that lowerBound and upperBound are not range-filters, as they are used only for determining the range values of the partitions. Therefore, no data is left unread.
Deciding which column to use for partitionColumn is vital. Ideally, all partitions should contain a comparable amount of data (number of rows in the context of DataFrame) to prevent data skew in which some tasks take significantly more time and system resources to finish than the others, result in reduced parallelism and performance. Also, choosing the right column to partition on may reduce the chance of needing to repartition and reshuffle the data at later stages of the application.
The number of partitions (numPartitions) affects application performance as well. When the amount of data is large, having too few partitions might create the following issues:
- Reduced parallelism as shown in the example above
- Increased partition size, where some (or all) of the partitions may be very large in size. The effect will be amplified if the data is skewed and result in uneven workload among the executors
When the partitions are too large, you might see this error during shuffling due to the Spark limitation that no shuffle block can be greater than 2GB size:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Citing from Databrick, you can get around the problem by:
- Increase the number of partitions in the DataFrame, which reduces the average partition size
- Increase the spark.sql.shuffle.partitions value (more on that in the next section)
- Minimize the data skew (e.g., via partitioning on another key)
On the other hand, too many partitions can also impact performance, as Spark might spend more time scheduling tasks to process the partitions than it would with fewer partitions while still maintaining adequate parallelism. You might also encounter the “too many open files” error during shuffling when the number of partitions is too high.
When Db2 is the data source, the tables that the application reads might have already been pre-partitioned, for example via a partition-by-range table space with a key column. If that is the case, you may use that as your partitionColumn (key column name) value and reference value for numPartitions (number of partitions in that table space) when you create the DataFrame. Those values enable Db2 to serve the data in a highly optimized fashion, and are what we use for our application.
Observations – Shuffle Partitions
The spark.sql.shuffle.partitions parameter (default: 200) mentioned above configures the number of partitions to use when shuffling data for joins or aggregations. If your application does not do any shuffle or aggregation, then this parameter has no effect.
scala> val df1 = sc.parallelize(1 to 100).toDF("num")
df1: org.apache.spark.sql.DataFrame = [num: int]
scala> val df2 = sc.parallelize(Seq((1,100), (2,200))).toDF("num","n")
df2: org.apache.spark.sql.DataFrame = [num: int, n: int]
As shown in the example above, this parameter implicitly determines the number of partitions of the resultant DataFrame after the join/aggregation operation. We left spark.sql.shuffle.partitions default to 200 and the DataFrame resulting from the “join” is created with 200 partitions.
This parameter can also affect the performance of your application, especially if the application is shuffle-heavy. Setting it too high will create an unnecessary amount of “map and reduce” operation where each has only a small quantity of data to work with. Setting it too low will cause the partition size to be larger and might reduce parallelism, especially when the data is skewed. Finally, because the resultant DataFrame now has spark.sql.shuffle.partitions number of partitions in it, that will have an effect on how the application performs at the later stages.
For our application, we found the following spark.sql.shuffle.partitions values yield good results. Again, we highly encourage you to experiment with different settings for your application and environment.
- Small: 200 (default)
- Medium: ~1000
- Large: 1000 – 2001 (Spark uses a different data structure for shuffling bookkeeping when the value is greater than 2000, which may decrease memory footprint4)
Other Tidbits – Different Ways of Table Join
With our application, we join the dimension tables with the fact table before processing the data. By default, Spark uses shuffle hash join:
val jdbcDFFact = spark.read.format("jdbc")
val jdbcDFDim1 = spark.read.format("jdbc")
val jdbcDFDim2 = spark.read.format("jdbc")
val jdbcDF = jdbcDFFact
.join(jdbcDFDim1, $"fact_key1" === $"dim1_key")
.join(jdbcDFDim2, $"fact_key2" === $"dim2_key")
This method induces a lot of data shuffling among the executors. With shuffle hash join, Spark will shuffle the data from both tables by the output key, so that data for each corresponding key resides within the same resultant partition.
Because the dimension tables in our case are actually quite small, we can use broadcast hash join instead:
val jdbcDFFact = ...
val jdbcDFDim1 = ...
val jdbcDFDim2 = ...
val jdbcDFBoardcast = jdbcDFFact
.join(broadcast(jdbcDFDim1), $"fact_key1" === $"dim1_key")
.join(broadcast(jdbcDFDim2), $"fact_key2" === $"dim2_key")
With broadcast hash join, the dimension table is broadcast to each partition of the fact table. Because each partition already has all the information it needs for joining, shuffling is not needed. Generally speaking, we can use this technique whenever we join a large DataFrame with a much smaller one that is small enough to completely fit into the driver and the executor memory.
However, because we are joining the tables before doing transformations on any of them, it is more efficient to perform the join within Db2 by specifying a SQL JOIN statement directly. We saw no significant CPU usage impact on Db2 by doing it this way.
val sqlStatement = """( SELECT * FROM TABLE_FACT
| INNER JOIN TABLE_DIM1 ON fact_key1 = dim1_key
| INNER JOIN TABLE_DIM2 ON fact_key2 = dim2_key
| ) as tmp""".stripMargin
val jdbcDFAlreadyJoined = spark.read.format("jdbc")
.option("url", url).option("user", username).option("password", password)
Other Tidbits – Full table read even with .limit()
Sometimes you want to try out your application during prototyping with a smaller set of data. Consider a code sequence like this:
df1 = spark.read.format("jdbc").option("dbtable", "TABLE1").option(...).load()
You might think Spark will only read the first 100000 rows from the data source, but it is not the case. As of Spark 2.2, this code will trigger a full table read on TABLE1, returning all rows from the data source; only then will Spark take the first 100000 rows of the DataFrame and perform the .count(). This issue is documented in JIRA SPARK-22386.
To get around this, instead of sending only the table name in the dbtable JDBC property, send a SQL statement. Using Db2’s “FETCH FIRST n ROWS ONLY” SQL syntax as example:
val sqlStatement = """( SELECT * FROM TABLE_FACT
| FETCH FIRST 100000 ROWS ONLY
| ) as tmp""".stripMargin
val df1 = spark.read.format("jdbc").option("dbtable", sqlStatement).option(...).load()
Other Tidbits – Don’t .cache() more than necessary
df1 = spark.read.format("jdbc").option(“dbtable”, “TABLE1”).option(...)
df2 = df1.select($”col001”, $”col002”).groupBy(...)
Assume TABLE1 has 100 columns, from col001 to col100, and we are only interested in col001 and col002. Running the code above as-is, Spark will issue an SQL query to the data source that is logically equivalent to:
SELECT col1, col2 FROM TABLE1
However, if we put .cache() at the end of the first statement after .load(), the logically equivalent SQL query will instead become:
SELECT * FROM TABLE1
As you can imagine, the second case may take a much longer time to run than the first case, because it reads in all 100 columns of data from the data source even though we are only interested in col001 and col002.
- Different Spark applications have different characteristics and resource requirements. You should experiment with different memory, cores and partition settings for your application and environment.
- Your application may run faster as the total number of executor cores increases. However, the performance gain may follow the diminishing returns pattern.
- With our application, the average throughput per core decreased once we exceeded 8 cores per executor.
- Memory usage in Spark highly depends on the data and how the application is written.
- The size of execution and storage memory within the unified memory region can be tuned based on your application’s needs.
- DataFrame utilizes memory much more efficiently than RDD.
- We get satisfactory results with our application by having 50GB memory per executor, and 50GB/200GB/400GB total memory for all executors for the small, medium, and large data sets respectively
- Specify partitionColumn, lowerBound, upperBound, numPartitions properties in the JDBC invocation to partition the initial DataFrame and enable multiple concurrent JDBC connections.
- Choose the partitionColumn column carefully to minimize data skew.
- Too few partitions in a DataFrame will reduce parallelism and may cause each partition to be too large to process. Too many partitions will create an unnecessary number of tasks, increasing task-scheduling overhead, and possibly causing the “too many open files” error.
- Shuffle blocks must be 2GB or smaller.
- When reading a partitioned Db2 table, you may set partitionColumn to the table’s key and numPartitions to the number of partitions in the table space.
- spark.sql.shuffle.partitions implicitly determines the number of partitions of the resultant DataFrame after join/aggregation operations. Be aware of the consequences of setting this value too high or too low.
- Our application performs well with the following spark.sql.shuffle.partitions values: 200 (default), ~1000, and 1000-2001 for the small, medium, and large data sets respectively.
- Joining tables at the database-server level may be much more efficient than joining the equivalent DataFrames in Spark.
- Full table read even with .limit() due to JIRA SPARK-22386.
- .cache() may cause more data to be read from the data source than necessary, if some of the columns in the DataFrame are not used.
IBM Open Data Analytics for z/OS Publications:
Resource Tuning Recommendations for IBM z/OS Platform for Apache Spark
Spark 2.2.0 Documentation:
Spark 2.2.0 Documentation - Configuration:
Spark 2.2.0 Documentation - Monitoring and Instrumentation
Spark 2.2.0 Documentation - Spark SQL, DataFrames and Datasets Guide:
Spark 2.2.0 Documentation - Tuning Spark:
Mastering Apache Spark - Jacek Laskowski:
Mastering Spark SQL - Jacek Laskowski: