Disaggregated Shuffle
Disaggregated shuffle service allows the bulk of the shuffle data to reside in a separate storage location (separate shared volume mount or object store) from the compute nodes thereby enabling efficient resource utilization. The Disaggregated shuffle
plugin makes use of the Spark shuffle manager API that replaces the existing Shuffle manager. It registers a new ShuffleManager and ShuffleDataIO component which enables the plugin to override the Spark shuffle I/O methods.
The Disaggregated shuffle
plugin is used by the Spark jobs that run out of local temporary storage or experience pod failures. It is also used when the local storage resources are insufficient, or as an alternative to emptyDir
to boost performance. The Disaggregated shuffle
plugin supports the following storages:
- Cloud Pak for Data shared volumes
- Any Spark-compatible storage such as cos://bucket, s3a://bucket, hdfs://, or file:///path
-
Save the following Python sample file.
from pyspark.sql import SparkSession import time def init_spark(): spark = SparkSession.builder.appName("auto-scale-test").getOrCreate() sc = spark.sparkContext return spark, sc def main(): spark, sc = init_spark() partitions = [10, 5, 8, 4, 9, 4, 7] parallelism = sc.defaultParallelism for num_partitions in partitions: print( f"Running experiment with {num_partitions} partitions leveraging {parallelism} cores." ) data = range(1, 20000000) v0 = sc.parallelize(data, parallelism) gr = v0.groupBy(lambda x: x % num_partitions, numPartitions=num_partitions).map( lambda x: (x[0], len(x[1])) ) dd = gr.collect() print(f"Buckets {dd}") time.sleep(5) if __name__ == "__main__": main()
-
Upload the Python file to the Cloud Object Storage bucket.
-
To enable shuffle on a shared volume within Analytics Engine, you must mount both the shared volume and specify the path within Disaggregated Shuffle. Run the following curl command to submit the Spark application.
curl -k -X POST https://<cluster-url>/v4/analytics_engines/<app-id>/spark_applications \
-H "Authorization: ZenApiKey <api-key>" \
-d '{ "application_details": {\
"conf": {\
"spark.hadoop.fs.s3a.endpoint": "s3.us-south.cloud-object-storage.appdomain.cloud",\
"spark.hadoop.fs.s3a.access.key": "<access-key>",\
"spark.hadoop.fs.s3a.secret.key": "<secret-key>",\
"spark.hadoop.fs.s3a.path.style.access": "true",\
"spark.hadoop.fs.s3a.fast.upload": "true","spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",\
"spark.shuffle.manager": "org.apache.spark.shuffle.sort.DisaggregatedShuffleManager",\
"spark.shuffle.sort.io.plugin.class": "org.apache.spark.shuffle.DisaggregatedShuffleDataIO",\
"spark.shuffle.disaggregated.rootDir": "file:///<path-name>/",\
"spark.shuffle.disaggregated.folderPrefixes": "1" },\
"application": "s3a://<cos-bucket-name>/<app-name>.py" },
"volumes": [{ "name": "cpd-instance::<volume-name>",
"mount_path": "file:///<path-name>" }]}'
The plugin cleans up the created shuffle files automatically. However the shuffle files created when a Spark driver is killed or crashes are not removed.It is recommend to regularly clean up the shuffle directory. If the shuffle files have an object store backup, lifecycle policy can be implemented on the bucket.
Performance optimizations
-
If the shuffle files are stored on a distributed filesystems, it is recommend to set
spark.shuffle.disaggregated.folderPrefixes
to 1. -
For object stores like IBM COS or AWS S3, it is recommend to set
spark.shuffle.disaggregated.folderPrefixes
to 10. For better performance on object stores, the shuffle plugin needs to be configured with the storage bucket and without any extra path added. Example, if the bucket is called “spark-shuffle-data”, then spark.shuffle.disaggregated.rootDir should be set to "s3a://spark-shuffle-data/" because the folder prefix option allows the Disaggregated Shuffle plugin to distribute reads and writes over multiple prefixes. -
The disaggreagated shuffle plugin uses buffers of the size 8388608 bytes (configured in spark.shuffle.disaggregated.bufferSize) for each task. Buffers are written in chunks of 2097152 bytes to the underlying filesystem (configured in spark.shuffle.disaggregated.bufferChunkSize, which by default is 1/4th of the buffer size). If your filesystem requires bigger chunks, you can increase both the buffer size and the chunk size.
-
Buffer sizes to read data can be configured separately. By default, Disaggregated Shuffle uses 134217728 bytes (128 MB) per task. This value can be changed by configuring
spark.shuffle.disaggregated.maxBufferSizeTask
. -
Each shuffle task pre-fetches the shuffle files in the background using a thread pool. The concurrency of the prefetcher is estimated based on the measured I/O latency. The maximum number of threads can be configured by setting spark.shuffle.disaggregated.maxConcurrencyTask (default: 5).